2 Star 0 Fork 32

我们17走/gazelle

forked from src-openEuler/gazelle 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
0258-cleancode-move-some-API-from-stack-to-rpc-and-rtw.patch 82.63 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604
From 7e4d7c638681df4f32c0d719c62b4e38ef69c1eb Mon Sep 17 00:00:00 2001
From: Lemmy Huang <huangliming5@huawei.com>
Date: Sat, 31 Aug 2024 14:51:41 +0800
Subject: [PATCH] cleancode: move some API from stack to rpc and rtw
Signed-off-by: Lemmy Huang <huangliming5@huawei.com>
---
src/lstack/api/lstack_epoll.c | 17 +-
src/lstack/api/lstack_rtc_api.c | 7 +-
src/lstack/api/lstack_rtw_api.c | 250 ++++++-
src/lstack/api/lstack_wrap.c | 1 -
src/lstack/core/lstack_dpdk.c | 5 +-
src/lstack/core/lstack_lwip.c | 47 +-
src/lstack/core/lstack_protocol_stack.c | 757 ++-------------------
src/lstack/core/lstack_thread_rpc.c | 610 +++++++++++++----
src/lstack/include/lstack_dpdk.h | 33 +-
src/lstack/include/lstack_epoll.h | 12 +-
src/lstack/include/lstack_lwip.h | 9 +-
src/lstack/include/lstack_protocol_stack.h | 51 +-
src/lstack/include/lstack_rpc_proc.h | 47 --
src/lstack/include/lstack_thread_rpc.h | 81 +--
src/lstack/netif/lstack_ethdev.c | 56 ++
15 files changed, 927 insertions(+), 1056 deletions(-)
delete mode 100644 src/lstack/include/lstack_rpc_proc.h
diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c
index 1c13076..ce3d267 100644
--- a/src/lstack/api/lstack_epoll.c
+++ b/src/lstack/api/lstack_epoll.c
@@ -19,15 +19,9 @@
#include <stdatomic.h>
#include <pthread.h>
-#include <lwip/lwipgz_sock.h>
#include <lwip/sockets.h>
-#include <lwip/lwipgz_event.h>
-#include <lwip/api.h>
-#include <lwip/tcp.h>
-#include <lwip/timeouts.h>
#include <lwip/lwipgz_posix_api.h>
-#include "lstack_ethdev.h"
#include "lstack_stack_stat.h"
#include "lstack_cfg.h"
#include "lstack_log.h"
@@ -306,6 +300,17 @@ int32_t lstack_epoll_create(int32_t flags)
return lstack_do_epoll_create(fd);
}
+static void stack_broadcast_clean_epoll(struct wakeup_poll *wakeup)
+{
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ struct protocol_stack *stack = NULL;
+
+ for (int32_t i = 0; i < stack_group->stack_num; i++) {
+ stack = stack_group->stacks[i];
+ rpc_call_clean_epoll(&stack->rpc_queue, wakeup);
+ }
+}
+
int32_t lstack_epoll_close(int32_t fd)
{
struct lwip_sock *sock = lwip_get_socket(fd);
diff --git a/src/lstack/api/lstack_rtc_api.c b/src/lstack/api/lstack_rtc_api.c
index 7689c83..60d3b23 100644
--- a/src/lstack/api/lstack_rtc_api.c
+++ b/src/lstack/api/lstack_rtc_api.c
@@ -42,11 +42,6 @@ static int rtc_close(int s)
return lwip_close(s);
}
-static int rtc_shutdown(int fd, int how)
-{
- return lwip_shutdown(fd, how);
-}
-
static int rtc_epoll_create(int flags)
{
if (stack_setup_app_thread() < 0) {
@@ -90,7 +85,7 @@ static int rtc_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *excep
void rtc_api_init(posix_api_t *api)
{
api->close_fn = rtc_close;
- api->shutdown_fn = rtc_shutdown;
+ api->shutdown_fn = lwip_shutdown;
api->socket_fn = rtc_socket;
api->accept_fn = lwip_accept;
api->accept4_fn = lwip_accept4;
diff --git a/src/lstack/api/lstack_rtw_api.c b/src/lstack/api/lstack_rtw_api.c
index d8634cc..7ceff20 100644
--- a/src/lstack/api/lstack_rtw_api.c
+++ b/src/lstack/api/lstack_rtw_api.c
@@ -13,14 +13,258 @@
#include <lwip/lwipgz_sock.h>
#include <lwip/sockets.h>
+#include "common/gazelle_base_func.h"
+#include "lstack_log.h"
+#include "lstack_cfg.h"
#include "lstack_thread_rpc.h"
-#include "lstack_epoll.h"
#include "lstack_protocol_stack.h"
-#include "lstack_cfg.h"
#include "lstack_lwip.h"
-#include "common/gazelle_base_func.h"
+#include "lstack_epoll.h"
#include "lstack_rtw_api.h"
+/* when fd is listenfd, listenfd of all protocol stack thread will be closed */
+static int stack_broadcast_close(int fd)
+{
+ int ret = 0;
+ struct lwip_sock *sock = lwip_get_socket(fd);
+ struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
+ if (sock == NULL) {
+ GAZELLE_RETURN(EBADF);
+ }
+
+ do {
+ sock = sock->listen_next;
+ if (stack == NULL || rpc_call_close(&stack->rpc_queue, fd)) {
+ ret = -1;
+ }
+
+ if (POSIX_IS_CLOSED(sock)) {
+ break;
+ }
+ fd = sock->conn->callback_arg.socket;
+ stack = get_protocol_stack_by_fd(fd);
+ } while (1);
+
+ return ret;
+}
+
+static int stack_broadcast_shutdown(int fd, int how)
+{
+ int32_t ret = 0;
+ struct lwip_sock *sock = lwip_get_socket(fd);
+ struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
+ if (sock == NULL) {
+ GAZELLE_RETURN(EBADF);
+ }
+
+ do {
+ sock = sock->listen_next;
+ if (stack == NULL || rpc_call_shutdown(&stack->rpc_queue, fd, how)) {
+ ret = -1;
+ }
+
+ if (POSIX_IS_CLOSED(sock)) {
+ break;
+ }
+ fd = sock->conn->callback_arg.socket;
+ stack = get_protocol_stack_by_fd(fd);
+ } while (1);
+
+ return ret;
+}
+
+/* choice one stack bind */
+static int stack_single_bind(int fd, const struct sockaddr *name, socklen_t namelen)
+{
+ struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
+ if (stack == NULL) {
+ GAZELLE_RETURN(EBADF);
+ }
+ return rpc_call_bind(&stack->rpc_queue, fd, name, namelen);
+}
+
+/* bind sync to all protocol stack thread, so that any protocol stack thread can build connect */
+static int stack_broadcast_bind(int fd, const struct sockaddr *name, socklen_t namelen)
+{
+ struct protocol_stack *cur_stack = get_protocol_stack_by_fd(fd);
+ struct protocol_stack *stack = NULL;
+ int ret, clone_fd;
+
+ struct lwip_sock *sock = lwip_get_socket(fd);
+ if (sock == NULL || cur_stack == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null or stack null\n", get_stack_tid(), fd);
+ GAZELLE_RETURN(EBADF);
+ }
+
+ ret = rpc_call_bind(&cur_stack->rpc_queue, fd, name, namelen);
+ if (ret < 0) {
+ close(fd);
+ return ret;
+ }
+
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ for (int i = 0; i < stack_group->stack_num; ++i) {
+ stack = stack_group->stacks[i];
+ if (stack != cur_stack) {
+ clone_fd = rpc_call_shadow_fd(&stack->rpc_queue, fd, name, namelen);
+ if (clone_fd < 0) {
+ stack_broadcast_close(fd);
+ return clone_fd;
+ }
+ }
+ }
+ return 0;
+}
+
+static void inline del_accept_in_event(struct lwip_sock *sock)
+{
+ pthread_spin_lock(&sock->wakeup->event_list_lock);
+
+ if (!NETCONN_IS_ACCEPTIN(sock)) {
+ sock->events &= ~EPOLLIN;
+ if (sock->events == 0) {
+ list_del_node(&sock->event_list);
+ }
+ }
+
+ pthread_spin_unlock(&sock->wakeup->event_list_lock);
+}
+
+static struct lwip_sock *get_min_accept_sock(int fd)
+{
+ struct lwip_sock *sock = lwip_get_socket(fd);
+ struct lwip_sock *min_sock = NULL;
+
+ while (sock) {
+ if (!NETCONN_IS_ACCEPTIN(sock)) {
+ sock = sock->listen_next;
+ continue;
+ }
+
+ if (min_sock == NULL || min_sock->stack->conn_num > sock->stack->conn_num) {
+ min_sock = sock;
+ }
+
+ sock = sock->listen_next;
+ }
+
+ return min_sock;
+}
+
+/* ergodic the protocol stack thread to find the connection, because all threads are listening */
+static int stack_broadcast_accept4(int fd, struct sockaddr *addr, socklen_t *addrlen, int flags)
+{
+ int ret = -1;
+ struct lwip_sock *min_sock = NULL;
+ struct lwip_sock *sock = lwip_get_socket(fd);
+ struct protocol_stack *stack = NULL;
+ if (sock == NULL) {
+ GAZELLE_RETURN(EBADF);
+ }
+
+ if (netconn_is_nonblocking(sock->conn)) {
+ min_sock = get_min_accept_sock(fd);
+ } else {
+ while ((min_sock = get_min_accept_sock(fd)) == NULL) {
+ lstack_block_wait(sock->wakeup, 0);
+ }
+ }
+
+ if (min_sock && min_sock->conn) {
+ stack = get_protocol_stack_by_fd(min_sock->conn->callback_arg.socket);
+ if (stack == NULL) {
+ GAZELLE_RETURN(EBADF);
+ }
+ ret = rpc_call_accept(&stack->rpc_queue, min_sock->conn->callback_arg.socket, addr, addrlen, flags);
+ }
+
+ if (min_sock && min_sock->wakeup && min_sock->wakeup->type == WAKEUP_EPOLL) {
+ del_accept_in_event(min_sock);
+ }
+
+ if (ret < 0) {
+ errno = EAGAIN;
+ }
+ return ret;
+}
+
+static int stack_broadcast_accept(int fd, struct sockaddr *addr, socklen_t *addrlen)
+{
+ if (get_global_cfg_params()->nonblock_mode)
+ return stack_broadcast_accept4(fd, addr, addrlen, O_NONBLOCK);
+ else
+ return stack_broadcast_accept4(fd, addr, addrlen, 0);
+}
+
+/* choice one stack listen */
+static int stack_single_listen(int fd, int backlog)
+{
+ struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
+ if (stack == NULL) {
+ GAZELLE_RETURN(EBADF);
+ }
+ return rpc_call_listen(&stack->rpc_queue, fd, backlog);
+}
+
+/* listen sync to all protocol stack thread, so that any protocol stack thread can build connect */
+static int stack_broadcast_listen(int fd, int backlog)
+{
+ typedef union sockaddr_union {
+ struct sockaddr sa;
+ struct sockaddr_in in;
+ struct sockaddr_in6 in6;
+ } sockaddr_t;
+
+ struct protocol_stack *cur_stack = get_protocol_stack_by_fd(fd);
+ struct protocol_stack *stack = NULL;
+ sockaddr_t addr;
+ socklen_t addr_len = sizeof(addr);
+ int ret, clone_fd;
+
+ struct lwip_sock *sock = lwip_get_socket(fd);
+ if (sock == NULL || cur_stack == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null or stack null\n", get_stack_tid(), fd);
+ GAZELLE_RETURN(EBADF);
+ }
+
+ ret = rpc_call_getsockname(&cur_stack->rpc_queue, fd, (struct sockaddr *)&addr, &addr_len);
+ if (ret != 0) {
+ return ret;
+ }
+
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ int min_conn_stk_idx = get_min_conn_stack(stack_group);
+
+ for (int32_t i = 0; i < stack_group->stack_num; ++i) {
+ stack = stack_group->stacks[i];
+ if (get_global_cfg_params()->seperate_send_recv && stack->is_send_thread) {
+ continue;
+ }
+ if (stack != cur_stack) {
+ clone_fd = rpc_call_shadow_fd(&stack->rpc_queue, fd, (struct sockaddr *)&addr, addr_len);
+ if (clone_fd < 0) {
+ stack_broadcast_close(fd);
+ return clone_fd;
+ }
+ } else {
+ clone_fd = fd;
+ }
+
+ if (min_conn_stk_idx == i) {
+ lwip_get_socket(clone_fd)->conn->is_master_fd = 1;
+ } else {
+ lwip_get_socket(clone_fd)->conn->is_master_fd = 0;
+ }
+
+ ret = rpc_call_listen(&stack->rpc_queue, clone_fd, backlog);
+ if (ret < 0) {
+ stack_broadcast_close(fd);
+ return ret;
+ }
+ }
+ return 0;
+}
+
static int rtw_socket(int domain, int type, int protocol)
{
struct protocol_stack *stack = get_bind_protocol_stack();
diff --git a/src/lstack/api/lstack_wrap.c b/src/lstack/api/lstack_wrap.c
index 1d5529d..8f80f98 100644
--- a/src/lstack/api/lstack_wrap.c
+++ b/src/lstack/api/lstack_wrap.c
@@ -14,7 +14,6 @@
#include <ifaddrs.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
-#include <net/if.h>
#include <lwip/lwipgz_posix_api.h>
#include <lwip/lwipgz_sock.h>
diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c
index 50fbdf6..f87e362 100644
--- a/src/lstack/core/lstack_dpdk.c
+++ b/src/lstack/core/lstack_dpdk.c
@@ -43,7 +43,6 @@
#include "lstack_log.h"
#include "common/dpdk_common.h"
#include "lstack_protocol_stack.h"
-#include "lstack_thread_rpc.h"
#include "lstack_lwip.h"
#include "lstack_cfg.h"
#include "lstack_virtio.h"
@@ -765,9 +764,9 @@ static int dpdk_bond_create(uint8_t mode, int *slave_port_id, int count)
return 0;
}
-int32_t init_dpdk_ethdev(void)
+int init_dpdk_ethdev(void)
{
- int32_t ret;
+ int ret;
int slave_port_id[GAZELLE_MAX_BOND_NUM];
int port_id = 0;
struct cfg_params *cfg = get_global_cfg_params();
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index 89142a4..3454961 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -29,15 +29,12 @@
#include <lwip/prot/etharp.h>
#include "common/gazelle_base_func.h"
-#include "lstack_ethdev.h"
-#include "lstack_protocol_stack.h"
#include "lstack_log.h"
-#include "lstack_dpdk.h"
+#include "lstack_cfg.h"
+#include "lstack_protocol_stack.h"
#include "lstack_stack_stat.h"
#include "lstack_epoll.h"
-#include "lstack_thread_rpc.h"
-#include "common/dpdk_common.h"
-#include "lstack_cfg.h"
+#include "lstack_dpdk.h"
#include "lstack_lwip.h"
static const uint8_t fin_packet = 0;
@@ -841,43 +838,24 @@ ssize_t gazelle_same_node_ring_send(struct lwip_sock *sock, const void *buf, siz
return act_len;
}
-PER_THREAD uint16_t stack_sock_num[GAZELLE_MAX_STACK_NUM] = {0};
-PER_THREAD uint16_t max_sock_stack = 0;
-
-static inline void thread_bind_stack(struct lwip_sock *sock)
-{
- if (likely(sock->already_bind_numa || !sock->stack)) {
- return;
- }
- sock->already_bind_numa = 1;
-
- if (get_global_cfg_params()->app_bind_numa == 0) {
- return;
- }
-
- stack_sock_num[sock->stack->stack_idx]++;
- if (stack_sock_num[sock->stack->stack_idx] > max_sock_stack) {
- max_sock_stack = stack_sock_num[sock->stack->stack_idx];
- bind_to_stack_numa(sock->stack);
- }
-}
-
ssize_t do_lwip_send_to_stack(int32_t fd, const void *buf, size_t len, int32_t flags,
const struct sockaddr *addr, socklen_t addrlen)
{
+ struct lwip_sock *sock;
ssize_t send = 0;
-
+
if (buf == NULL) {
GAZELLE_RETURN(EINVAL);
}
-
if (addr && addr->sa_family != AF_INET && addr->sa_family != AF_INET6) {
GAZELLE_RETURN(EINVAL);
}
-
- struct lwip_sock *sock = lwip_get_socket(fd);
- thread_bind_stack(sock);
+ sock = lwip_get_socket(fd);
+ if (unlikely(sock->already_bind_numa == 0 && sock->stack)) {
+ thread_bind_stack(sock->stack);
+ sock->already_bind_numa = 1;
+ }
if (sock->same_node_tx_ring != NULL) {
return gazelle_same_node_ring_send(sock, buf, len, flags);
@@ -1131,7 +1109,10 @@ ssize_t do_lwip_read_from_stack(int32_t fd, void *buf, size_t len, int32_t flags
return -1;
}
- thread_bind_stack(sock);
+ if (unlikely(sock->already_bind_numa == 0 && sock->stack)) {
+ thread_bind_stack(sock->stack);
+ sock->already_bind_numa = 1;
+ }
if (sock->same_node_rx_ring != NULL) {
return gazelle_same_node_ring_recv(sock, buf, len, flags);
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index f56e911..bcca1a7 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -22,14 +22,12 @@
#include <lwip/lwipgz_posix_api.h>
#include "common/gazelle_base_func.h"
-#include "lstack_thread_rpc.h"
#include "common/dpdk_common.h"
#include "lstack_log.h"
+#include "lstack_cfg.h"
#include "lstack_dpdk.h"
#include "lstack_ethdev.h"
-#include "lstack_vdev.h"
#include "lstack_lwip.h"
-#include "lstack_cfg.h"
#include "lstack_control_plane.h"
#include "lstack_epoll.h"
#include "lstack_stack_stat.h"
@@ -64,18 +62,6 @@ static void stack_wait_quit(struct protocol_stack *stack)
}
}
-void bind_to_stack_numa(struct protocol_stack *stack)
-{
- int32_t ret;
- pthread_t tid = pthread_self();
-
- ret = pthread_setaffinity_np(tid, sizeof(stack->idle_cpuset), &stack->idle_cpuset);
- if (ret != 0) {
- LSTACK_LOG(ERR, LSTACK, "thread %d setaffinity to stack %hu failed\n", rte_gettid(), stack->queue_id);
- return;
- }
-}
-
static inline void set_stack_idx(uint16_t idx)
{
g_stack_p = g_stack_group.stacks[idx];
@@ -97,27 +83,6 @@ struct protocol_stack_group *get_protocol_stack_group(void)
return &g_stack_group;
}
-int get_min_conn_stack(struct protocol_stack_group *stack_group)
-{
- int min_conn_stk_idx = 0;
- int min_conn_num = GAZELLE_MAX_CLIENTS;
- for (int i = 0; i < stack_group->stack_num; i++) {
- struct protocol_stack* stack = stack_group->stacks[i];
- if (get_global_cfg_params()->seperate_send_recv) {
- if (!stack->is_send_thread && stack->conn_num < min_conn_num) {
- min_conn_stk_idx = i;
- min_conn_num = stack->conn_num;
- }
- } else {
- if (stack->conn_num < min_conn_num) {
- min_conn_stk_idx = i;
- min_conn_num = stack->conn_num;
- }
- }
- }
- return min_conn_stk_idx;
-}
-
struct protocol_stack *get_protocol_stack(void)
{
return g_stack_p;
@@ -179,6 +144,57 @@ struct protocol_stack *get_bind_protocol_stack(void)
return stack_group->stacks[index];
}
+int get_min_conn_stack(struct protocol_stack_group *stack_group)
+{
+ struct protocol_stack* stack;
+ int min_conn_stk_idx = 0;
+ int min_conn_num = GAZELLE_MAX_CLIENTS;
+
+ for (int i = 0; i < stack_group->stack_num; i++) {
+ stack = stack_group->stacks[i];
+ if (get_global_cfg_params()->seperate_send_recv) {
+ if (!stack->is_send_thread && stack->conn_num < min_conn_num) {
+ min_conn_stk_idx = i;
+ min_conn_num = stack->conn_num;
+ }
+ } else {
+ if (stack->conn_num < min_conn_num) {
+ min_conn_stk_idx = i;
+ min_conn_num = stack->conn_num;
+ }
+ }
+ }
+ return min_conn_stk_idx;
+}
+
+void bind_to_stack_numa(struct protocol_stack *stack)
+{
+ int32_t ret;
+ pthread_t tid = pthread_self();
+
+ ret = pthread_setaffinity_np(tid, sizeof(stack->idle_cpuset), &stack->idle_cpuset);
+ if (ret != 0) {
+ LSTACK_LOG(ERR, LSTACK, "thread %d setaffinity to stack %hu failed\n", rte_gettid(), stack->queue_id);
+ return;
+ }
+}
+
+void thread_bind_stack(struct protocol_stack *stack)
+{
+ static PER_THREAD uint16_t stack_sock_num[GAZELLE_MAX_STACK_NUM] = {0};
+ static PER_THREAD uint16_t max_sock_stack = 0;
+
+ if (get_global_cfg_params()->app_bind_numa == 0) {
+ return;
+ }
+
+ stack_sock_num[stack->stack_idx]++;
+ if (stack_sock_num[stack->stack_idx] > max_sock_stack) {
+ max_sock_stack = stack_sock_num[stack->stack_idx];
+ bind_to_stack_numa(stack);
+ }
+}
+
static uint32_t get_protocol_traffic(struct protocol_stack *stack)
{
if (use_ltran()) {
@@ -232,6 +248,11 @@ void low_power_idling(struct protocol_stack *stack)
}
}
+struct thread_params {
+ uint16_t queue_id;
+ uint16_t idx;
+};
+
static int32_t create_thread(void *arg, char *thread_name, stack_thread_func func)
{
/* thread may run slow, if arg is temp var maybe have relese */
@@ -373,7 +394,7 @@ static int32_t init_stack_value(struct protocol_stack *stack, void *arg)
return 0;
}
-void wait_sem_value(sem_t *sem, int32_t wait_value)
+static void wait_sem_value(sem_t *sem, int32_t wait_value)
{
int32_t sem_val;
do {
@@ -546,7 +567,7 @@ static void* gazelle_stack_thread(void *arg)
return NULL;
}
-int32_t stack_group_init_mempool(void)
+static int stack_group_init_mempool(void)
{
struct cfg_params *cfg_params = get_global_cfg_params();
uint32_t total_mbufs = 0;
@@ -702,655 +723,9 @@ OUT2:
return -1;
}
-void stack_arp(struct rpc_msg *msg)
-{
- struct rte_mbuf *mbuf = (struct rte_mbuf *)msg->args[MSG_ARG_0].p;
- struct protocol_stack *stack = get_protocol_stack();
-
- eth_dev_recv(mbuf, stack);
-}
-
-void stack_socket(struct rpc_msg *msg)
-{
- msg->result = lwip_socket(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i);
- if (msg->result < 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, %ld socket failed\n", get_stack_tid(), msg->result);
- }
-}
-
-void stack_close(struct rpc_msg *msg)
-{
- int32_t fd = msg->args[MSG_ARG_0].i;
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- struct lwip_sock *sock = lwip_get_socket(fd);
-
- if (sock && __atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) > 0) {
- msg->recall_flag = 1;
- rpc_call(&stack->rpc_queue, msg); /* until stack_send recall finish */
- return;
- }
-
- msg->result = lwip_close(fd);
- if (msg->result != 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
- }
-}
-
-void stack_shutdown(struct rpc_msg *msg)
-{
- int fd = msg->args[MSG_ARG_0].i;
- int how = msg->args[MSG_ARG_1].i;
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- struct lwip_sock *sock = lwip_get_socket(fd);
-
- if (sock && __atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) > 0) {
- msg->recall_flag = 1;
- rpc_call(&stack->rpc_queue, msg);
- return;
- }
-
- msg->result = lwip_shutdown(fd, how);
- if (msg->result != 0 && errno != ENOTCONN) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), fd, msg->result);
- }
-
- posix_api->shutdown_fn(fd, how);
-}
-
-void stack_bind(struct rpc_msg *msg)
-{
- msg->result = lwip_bind(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].cp, msg->args[MSG_ARG_2].socklen);
- if (msg->result != 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
- }
-}
-
-void stack_listen(struct rpc_msg *msg)
-{
- int32_t fd = msg->args[MSG_ARG_0].i;
- int32_t backlog = msg->args[MSG_ARG_1].i;
-
- struct lwip_sock *sock = lwip_get_socket(fd);
- if (sock == NULL) {
- msg->result = -1;
- return;
- }
-
- /* new listen add to stack listen list */
- msg->result = lwip_listen(fd, backlog);
- if (msg->result != 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
- }
-}
-
-void stack_accept(struct rpc_msg *msg)
-{
- int32_t fd = msg->args[MSG_ARG_0].i;
- msg->result = -1;
- struct protocol_stack *stack = get_protocol_stack();
-
- int32_t accept_fd = lwip_accept4(fd, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].p, msg->args[MSG_ARG_3].i);
- if (accept_fd < 0) {
- stack->stats.accept_fail++;
- LSTACK_LOG(ERR, LSTACK, "fd %d ret %d\n", fd, accept_fd);
- return;
- }
-
- struct lwip_sock *sock = lwip_get_socket(accept_fd);
- if (sock == NULL || sock->stack == NULL) {
- lwip_close(accept_fd);
- LSTACK_LOG(ERR, LSTACK, "fd %d ret %d\n", fd, accept_fd);
- return;
- }
-
- msg->result = accept_fd;
- sock->stack->conn_num++;
- if (rte_ring_count(sock->conn->recvmbox->ring)) {
- do_lwip_add_recvlist(accept_fd);
- }
-}
-
-void stack_connect(struct rpc_msg *msg)
-{
- msg->result = lwip_connect(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].socklen);
- if (msg->result < 0) {
- msg->result = -errno;
- }
-}
-
-void stack_getpeername(struct rpc_msg *msg)
-{
- msg->result = lwip_getpeername(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].p);
- if (msg->result != 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
- }
-}
-
-void stack_getsockname(struct rpc_msg *msg)
-{
- msg->result = lwip_getsockname(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].p);
- if (msg->result != 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
- }
-}
-
-void stack_getsockopt(struct rpc_msg *msg)
-{
- msg->result = lwip_getsockopt(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i,
- msg->args[MSG_ARG_3].p, msg->args[MSG_ARG_4].p);
- if (msg->result != 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d, level %d, optname %d, fail %ld\n", get_stack_tid(),
- msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i, msg->result);
- }
-}
-
-void stack_setsockopt(struct rpc_msg *msg)
-{
- msg->result = lwip_setsockopt(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i,
- msg->args[MSG_ARG_3].cp, msg->args[MSG_ARG_4].socklen);
- if (msg->result != 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d, level %d, optname %d, fail %ld\n", get_stack_tid(),
- msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i, msg->result);
- }
-}
-
-void stack_fcntl(struct rpc_msg *msg)
-{
- msg->result = lwip_fcntl(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].l);
- if (msg->result != 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
- }
-}
-
-void stack_ioctl(struct rpc_msg *msg)
-{
- msg->result = lwip_ioctl(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].l, msg->args[MSG_ARG_2].p);
- if (msg->result != 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
- }
-}
-
-void stack_recv(struct rpc_msg *msg)
-{
- msg->result = lwip_recv(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].size,
- msg->args[MSG_ARG_3].i);
-}
-
-void stack_tcp_send(struct rpc_msg *msg)
-{
- int32_t fd = msg->args[MSG_ARG_0].i;
- size_t len = msg->args[MSG_ARG_1].size;
- struct protocol_stack *stack = get_protocol_stack();
- int replenish_again;
-
- struct lwip_sock *sock = lwip_get_socket(fd);
- if (POSIX_IS_CLOSED(sock)) {
- msg->result = -1;
- return;
- }
-
- if (get_protocol_stack_group()->latency_start) {
- calculate_sock_latency(&stack->latency, sock, GAZELLE_LATENCY_WRITE_RPC_MSG);
- }
-
- replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0);
- if (replenish_again < 0) {
- __sync_fetch_and_sub(&sock->call_num, 1);
- return;
- }
-
- if (NETCONN_IS_DATAOUT(sock) || replenish_again > 0) {
- if (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1) {
- msg->recall_flag = 1;
- rpc_call(&stack->rpc_queue, msg);
- return;
- }
- }
-
- __sync_fetch_and_sub(&sock->call_num, 1);
- return;
-}
-
-void stack_udp_send(struct rpc_msg *msg)
-{
- int32_t fd = msg->args[MSG_ARG_0].i;
- size_t len = msg->args[MSG_ARG_1].size;
- struct protocol_stack *stack = get_protocol_stack();
- int replenish_again;
-
- struct lwip_sock *sock = lwip_get_socket(fd);
- if (POSIX_IS_CLOSED(sock)) {
- msg->result = -1;
- return;
- }
-
- if (get_protocol_stack_group()->latency_start) {
- calculate_sock_latency(&stack->latency, sock, GAZELLE_LATENCY_WRITE_RPC_MSG);
- }
-
- replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0);
- if ((replenish_again > 0) && (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1)) {
- rpc_call_replenish(&stack->rpc_queue, sock);
- return;
- }
-
- __sync_fetch_and_sub(&sock->call_num, 1);
- return;
-}
-
-/* any protocol stack thread receives arp packet and sync it to other threads so that it can have the arp table */
-void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack)
-{
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
- struct rte_mbuf *mbuf_copy = NULL;
- struct protocol_stack *stack = NULL;
- int32_t ret;
-
- for (int32_t i = 0; i < stack_group->stack_num; i++) {
- stack = stack_group->stacks[i];
- if (cur_stack == stack) {
- continue;
- }
-
- /* stack maybe not init in app thread yet */
- if (stack == NULL || !(netif_is_up(&stack->netif))) {
- continue;
- }
-
- ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, true);
- if (ret != 0) {
- stack->stats.rx_allocmbuf_fail++;
- return;
- }
- copy_mbuf(mbuf_copy, mbuf);
-
- ret = rpc_call_arp(&stack->rpc_queue, mbuf_copy);
- if (ret != 0) {
- rte_pktmbuf_free(mbuf_copy);
- return;
- }
- }
-#if RTE_VERSION < RTE_VERSION_NUM(23, 11, 0, 0)
- if (get_global_cfg_params()->kni_switch) {
- ret = dpdk_alloc_pktmbuf(cur_stack->rxtx_mbuf_pool, &mbuf_copy, 1, true);
- if (ret != 0) {
- cur_stack->stats.rx_allocmbuf_fail++;
- return;
- }
- copy_mbuf(mbuf_copy, mbuf);
- kni_handle_tx(mbuf_copy);
- }
-#endif
- if (get_global_cfg_params()->flow_bifurcation) {
- ret = dpdk_alloc_pktmbuf(cur_stack->rxtx_mbuf_pool, &mbuf_copy, 1, true);
- if (ret != 0) {
- cur_stack->stats.rx_allocmbuf_fail++;
- return;
- }
- copy_mbuf(mbuf_copy, mbuf);
- virtio_tap_process_tx(cur_stack->queue_id, mbuf_copy);
- }
- return;
-}
-
-void stack_broadcast_clean_epoll(struct wakeup_poll *wakeup)
-{
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
- struct protocol_stack *stack = NULL;
-
- for (int32_t i = 0; i < stack_group->stack_num; i++) {
- stack = stack_group->stacks[i];
- rpc_call_clean_epoll(&stack->rpc_queue, wakeup);
- }
-}
-
-void stack_clean_epoll(struct rpc_msg *msg)
-{
- struct protocol_stack *stack = get_protocol_stack();
- struct wakeup_poll *wakeup = (struct wakeup_poll *)msg->args[MSG_ARG_0].p;
-
- list_del_node(&wakeup->wakeup_list[stack->stack_idx]);
-}
-
-void stack_mempool_size(struct rpc_msg *msg)
-{
- struct protocol_stack *stack = get_protocol_stack();
-
- msg->result = rte_mempool_avail_count(stack->rxtx_mbuf_pool);
-}
-
-void stack_create_shadow_fd(struct rpc_msg *msg)
-{
- int32_t fd = msg->args[MSG_ARG_0].i;
- struct sockaddr *addr = msg->args[MSG_ARG_1].p;
- socklen_t addr_len = msg->args[MSG_ARG_2].socklen;
-
- int32_t clone_fd = 0;
- struct lwip_sock *sock = lwip_get_socket(fd);
- if (sock == NULL) {
- LSTACK_LOG(ERR, LSTACK, "get sock null fd=%d\n", fd);
- msg->result = -1;
- return;
- }
-
- int domain = addr->sa_family;
- int type = NETCONN_IS_UDP(sock) ? SOCK_DGRAM : SOCK_STREAM;
- clone_fd = lwip_socket(domain, type, 0);
- if (clone_fd < 0) {
- LSTACK_LOG(ERR, LSTACK, "clone socket failed clone_fd=%d errno=%d\n", clone_fd, errno);
- msg->result = clone_fd;
- return;
- }
-
- struct lwip_sock *clone_sock = lwip_get_socket(clone_fd);
- if (clone_sock == NULL) {
- LSTACK_LOG(ERR, LSTACK, "get sock null fd=%d clone_fd=%d\n", fd, clone_fd);
- msg->result = -1;
- return;
- }
-
- do_lwip_clone_sockopt(clone_sock, sock);
-
- while (sock->listen_next) {
- sock = sock->listen_next;
- }
- sock->listen_next = clone_sock;
-
- int32_t ret = lwip_bind(clone_fd, addr, addr_len);
- if (ret < 0) {
- LSTACK_LOG(ERR, LSTACK, "clone bind failed clone_fd=%d errno=%d\n", clone_fd, errno);
- msg->result = ret;
- return;
- }
-
- msg->result = clone_fd;
-}
-
-void stack_replenish_sendring(struct rpc_msg *msg)
-{
- struct protocol_stack *stack = get_protocol_stack();
- struct lwip_sock *sock = (struct lwip_sock *)msg->args[MSG_ARG_0].p;
-
- msg->result = do_lwip_replenish_sendring(stack, sock);
- if (msg->result == true) {
- msg->recall_flag = 1;
- rpc_call(&stack->rpc_queue, msg);
- }
-}
-
-void stack_get_conntable(struct rpc_msg *msg)
-{
- struct gazelle_stat_lstack_conn_info *conn = (struct gazelle_stat_lstack_conn_info *)msg->args[MSG_ARG_0].p;
- uint32_t max_num = msg->args[MSG_ARG_1].u;
-
- msg->result = do_lwip_get_conntable(conn, max_num);
-}
-
-void stack_get_connnum(struct rpc_msg *msg)
-{
- msg->result = do_lwip_get_connnum();
-}
-
-void stack_recvlist_count(struct rpc_msg *msg)
-{
- struct protocol_stack *stack = get_protocol_stack();
- struct list_node *list = &stack->recv_list;
- uint32_t count = 0;
- struct list_node *node;
- struct list_node *temp;
-
- list_for_each_node(node, temp, list) {
- count++;
- }
-
- msg->result = count;
-}
-
-/* when fd is listenfd, listenfd of all protocol stack thread will be closed */
-int32_t stack_broadcast_close(int32_t fd)
-{
- int32_t ret = 0;
- struct lwip_sock *sock = lwip_get_socket(fd);
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- if (sock == NULL) {
- GAZELLE_RETURN(EBADF);
- }
-
- do {
- sock = sock->listen_next;
- if (stack == NULL || rpc_call_close(&stack->rpc_queue, fd)) {
- ret = -1;
- }
-
- if (POSIX_IS_CLOSED(sock)) {
- break;
- }
- fd = sock->conn->callback_arg.socket;
- stack = get_protocol_stack_by_fd(fd);
- } while (1);
-
- return ret;
-}
-
-int stack_broadcast_shutdown(int fd, int how)
-{
- int32_t ret = 0;
- struct lwip_sock *sock = lwip_get_socket(fd);
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- if (sock == NULL) {
- GAZELLE_RETURN(EBADF);
- }
-
- do {
- sock = sock->listen_next;
- if (stack == NULL || rpc_call_shutdown(&stack->rpc_queue, fd, how)) {
- ret = -1;
- }
-
- if (POSIX_IS_CLOSED(sock)) {
- break;
- }
- fd = sock->conn->callback_arg.socket;
- stack = get_protocol_stack_by_fd(fd);
- } while (1);
-
- return ret;
-}
-
-/* choice one stack listen */
-int32_t stack_single_listen(int32_t fd, int32_t backlog)
-{
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- if (stack == NULL) {
- GAZELLE_RETURN(EBADF);
- }
- return rpc_call_listen(&stack->rpc_queue, fd, backlog);
-}
-
-/* listen sync to all protocol stack thread, so that any protocol stack thread can build connect */
-int32_t stack_broadcast_listen(int32_t fd, int32_t backlog)
-{
- typedef union sockaddr_union {
- struct sockaddr sa;
- struct sockaddr_in in;
- struct sockaddr_in6 in6;
- } sockaddr_t;
-
- struct protocol_stack *cur_stack = get_protocol_stack_by_fd(fd);
- struct protocol_stack *stack = NULL;
- sockaddr_t addr;
- socklen_t addr_len = sizeof(addr);
- int32_t ret, clone_fd;
-
- struct lwip_sock *sock = lwip_get_socket(fd);
- if (sock == NULL || cur_stack == NULL) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null or stack null\n", get_stack_tid(), fd);
- GAZELLE_RETURN(EBADF);
- }
-
- ret = rpc_call_getsockname(&cur_stack->rpc_queue, fd, (struct sockaddr *)&addr, &addr_len);
- if (ret != 0) {
- return ret;
- }
-
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
- int min_conn_stk_idx = get_min_conn_stack(stack_group);
-
- for (int32_t i = 0; i < stack_group->stack_num; ++i) {
- stack = stack_group->stacks[i];
- if (get_global_cfg_params()->seperate_send_recv && stack->is_send_thread) {
- continue;
- }
- if (stack != cur_stack) {
- clone_fd = rpc_call_shadow_fd(&stack->rpc_queue, fd, (struct sockaddr *)&addr, addr_len);
- if (clone_fd < 0) {
- stack_broadcast_close(fd);
- return clone_fd;
- }
- } else {
- clone_fd = fd;
- }
-
- if (min_conn_stk_idx == i) {
- lwip_get_socket(clone_fd)->conn->is_master_fd = 1;
- } else {
- lwip_get_socket(clone_fd)->conn->is_master_fd = 0;
- }
-
- ret = rpc_call_listen(&stack->rpc_queue, clone_fd, backlog);
- if (ret < 0) {
- stack_broadcast_close(fd);
- return ret;
- }
- }
- return 0;
-}
-
-static struct lwip_sock *get_min_accept_sock(int32_t fd)
-{
- struct lwip_sock *sock = lwip_get_socket(fd);
- struct lwip_sock *min_sock = NULL;
-
- while (sock) {
- if (!NETCONN_IS_ACCEPTIN(sock)) {
- sock = sock->listen_next;
- continue;
- }
-
- if (min_sock == NULL || min_sock->stack->conn_num > sock->stack->conn_num) {
- min_sock = sock;
- }
-
- sock = sock->listen_next;
- }
-
- return min_sock;
-}
-
-static void inline del_accept_in_event(struct lwip_sock *sock)
-{
- pthread_spin_lock(&sock->wakeup->event_list_lock);
-
- if (!NETCONN_IS_ACCEPTIN(sock)) {
- sock->events &= ~EPOLLIN;
- if (sock->events == 0) {
- list_del_node(&sock->event_list);
- }
- }
-
- pthread_spin_unlock(&sock->wakeup->event_list_lock);
-}
-
-/* choice one stack bind */
-int32_t stack_single_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen)
-{
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- if (stack == NULL) {
- GAZELLE_RETURN(EBADF);
- }
- return rpc_call_bind(&stack->rpc_queue, fd, name, namelen);
-}
-
-/* bind sync to all protocol stack thread, so that any protocol stack thread can build connect */
-int32_t stack_broadcast_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen)
-{
- struct protocol_stack *cur_stack = get_protocol_stack_by_fd(fd);
- struct protocol_stack *stack = NULL;
- int32_t ret, clone_fd;
-
- struct lwip_sock *sock = lwip_get_socket(fd);
- if (sock == NULL || cur_stack == NULL) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null or stack null\n", get_stack_tid(), fd);
- GAZELLE_RETURN(EBADF);
- }
-
- ret = rpc_call_bind(&cur_stack->rpc_queue, fd, name, namelen);
- if (ret < 0) {
- close(fd);
- return ret;
- }
-
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
- for (int32_t i = 0; i < stack_group->stack_num; ++i) {
- stack = stack_group->stacks[i];
- if (stack != cur_stack) {
- clone_fd = rpc_call_shadow_fd(&stack->rpc_queue, fd, name, namelen);
- if (clone_fd < 0) {
- stack_broadcast_close(fd);
- return clone_fd;
- }
- }
- }
- return 0;
-}
-
-/* ergodic the protocol stack thread to find the connection, because all threads are listening */
-int32_t stack_broadcast_accept4(int32_t fd, struct sockaddr *addr, socklen_t *addrlen, int flags)
-{
- int32_t ret = -1;
- struct lwip_sock *min_sock = NULL;
- struct lwip_sock *sock = lwip_get_socket(fd);
- struct protocol_stack *stack = NULL;
- if (sock == NULL) {
- GAZELLE_RETURN(EBADF);
- }
-
- if (netconn_is_nonblocking(sock->conn)) {
- min_sock = get_min_accept_sock(fd);
- } else {
- while ((min_sock = get_min_accept_sock(fd)) == NULL) {
- lstack_block_wait(sock->wakeup, 0);
- }
- }
-
- if (min_sock && min_sock->conn) {
- stack = get_protocol_stack_by_fd(min_sock->conn->callback_arg.socket);
- if (stack == NULL) {
- GAZELLE_RETURN(EBADF);
- }
- ret = rpc_call_accept(&stack->rpc_queue, min_sock->conn->callback_arg.socket, addr, addrlen, flags);
- }
-
- if (min_sock && min_sock->wakeup && min_sock->wakeup->type == WAKEUP_EPOLL) {
- del_accept_in_event(min_sock);
- }
-
- if (ret < 0) {
- errno = EAGAIN;
- }
- return ret;
-}
-
-int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *addrlen)
-{
- if (get_global_cfg_params()->nonblock_mode)
- return stack_broadcast_accept4(fd, addr, addrlen, O_NONBLOCK);
- else
- return stack_broadcast_accept4(fd, addr, addrlen, 0);
-}
-
-static void stack_all_fds_close(void)
+void stack_exit(void)
{
+ /* close all fd */
for (int i = 3; i < GAZELLE_MAX_CLIENTS + GAZELLE_RESERVED_CLIENTS; i++) {
struct lwip_sock *sock = lwip_get_socket(i);
if (!POSIX_IS_CLOSED(sock) && sock->stack == get_protocol_stack()) {
@@ -1359,16 +734,6 @@ static void stack_all_fds_close(void)
}
}
-static void stack_exit(void)
-{
- stack_all_fds_close();
-}
-
-void stack_exit_by_rpc(struct rpc_msg *msg)
-{
- stack_exit();
-}
-
void stack_group_exit(void)
{
int i;
diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c
index 8ac06cb..3e9889a 100644
--- a/src/lstack/core/lstack_thread_rpc.c
+++ b/src/lstack/core/lstack_thread_rpc.c
@@ -13,22 +13,27 @@
#include <lwip/lwipgz_sock.h>
#include <rte_mempool.h>
+#include "lwip/lwipgz_posix_api.h"
+
#include "lstack_log.h"
#include "lstack_cfg.h"
#include "lstack_dpdk.h"
-#include "lstack_rpc_proc.h"
#include "lstack_stack_stat.h"
#include "lstack_protocol_stack.h"
#include "lstack_thread_rpc.h"
+#include "lstack_epoll.h"
+#include "lstack_lwip.h"
static PER_THREAD struct rpc_msg_pool *g_rpc_pool = NULL;
static struct rpc_stats g_rpc_stats;
+
struct rpc_stats *rpc_stats_get(void)
{
return &g_rpc_stats;
}
-static inline __attribute__((always_inline)) struct rpc_msg *get_rpc_msg(struct rpc_msg_pool *rpc_pool)
+__rte_always_inline
+static struct rpc_msg *get_rpc_msg(struct rpc_msg_pool *rpc_pool)
{
int ret;
struct rpc_msg *msg = NULL;
@@ -42,23 +47,11 @@ static inline __attribute__((always_inline)) struct rpc_msg *get_rpc_msg(struct
static void rpc_msg_init(struct rpc_msg *msg, rpc_msg_func func, struct rpc_msg_pool *pool)
{
- msg->rpcpool = pool;
- pthread_spin_init(&msg->lock, PTHREAD_PROCESS_PRIVATE);
- msg->func = func;
- msg->sync_flag = 1;
+ msg->func = func;
+ msg->rpcpool = pool;
+ msg->sync_flag = 1;
msg->recall_flag = 0;
-}
-
-static struct rpc_msg *rpc_msg_alloc_except(rpc_msg_func func)
-{
- struct rpc_msg *msg = calloc(1, sizeof(struct rpc_msg));
- if (msg == NULL) {
- return NULL;
- }
-
- rpc_msg_init(msg, func, NULL);
-
- return msg;
+ pthread_spin_init(&msg->lock, PTHREAD_PROCESS_PRIVATE);
}
static struct rpc_msg *rpc_msg_alloc(rpc_msg_func func)
@@ -92,9 +85,27 @@ static struct rpc_msg *rpc_msg_alloc(rpc_msg_func func)
return msg;
}
-static inline __attribute__((always_inline)) int32_t rpc_sync_call(rpc_queue *queue, struct rpc_msg *msg)
+__rte_always_inline
+static void rpc_msg_free(struct rpc_msg *msg)
+{
+ pthread_spin_destroy(&msg->lock);
+ if (msg->rpcpool != NULL && msg->rpcpool->mempool != NULL) {
+ rte_mempool_put(msg->rpcpool->mempool, (void *)msg);
+ } else {
+ free(msg);
+ }
+}
+
+__rte_always_inline
+static void rpc_call(rpc_queue *queue, struct rpc_msg *msg)
{
- int32_t ret;
+ lockless_queue_mpsc_push(queue, &msg->queue_node);
+}
+
+__rte_always_inline
+static int rpc_sync_call(rpc_queue *queue, struct rpc_msg *msg)
+{
+ int ret;
pthread_spin_trylock(&msg->lock);
rpc_call(queue, msg);
@@ -107,12 +118,39 @@ static inline __attribute__((always_inline)) int32_t rpc_sync_call(rpc_queue *qu
return ret;
}
-int32_t rpc_msgcnt(rpc_queue *queue)
+int rpc_msgcnt(rpc_queue *queue)
{
return lockless_queue_count(queue);
}
-int rpc_poll_msg(rpc_queue *queue, uint32_t max_num)
+static struct rpc_msg *rpc_msg_alloc_except(rpc_msg_func func)
+{
+ struct rpc_msg *msg = calloc(1, sizeof(struct rpc_msg));
+ if (msg == NULL) {
+ return NULL;
+ }
+
+ rpc_msg_init(msg, func, NULL);
+ return msg;
+}
+
+static void stack_exit_by_rpc(struct rpc_msg *msg)
+{
+ stack_exit();
+}
+
+int rpc_call_stack_exit(rpc_queue *queue)
+{
+ struct rpc_msg *msg = rpc_msg_alloc_except(stack_exit_by_rpc);
+ if (msg == NULL) {
+ return -1;
+ }
+
+ rpc_call(queue, msg);
+ return 0;
+}
+
+int rpc_poll_msg(rpc_queue *queue, int max_num)
{
int force_quit = 0;
struct rpc_msg *msg = NULL;
@@ -149,101 +187,165 @@ int rpc_poll_msg(rpc_queue *queue, uint32_t max_num)
return force_quit;
}
-int32_t rpc_call_conntable(rpc_queue *queue, void *conn_table, uint32_t max_conn)
+
+static void callback_socket(struct rpc_msg *msg)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_get_conntable);
- if (msg == NULL) {
- return -1;
+ msg->result = lwip_socket(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i);
+ if (msg->result < 0) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, %ld socket failed\n", get_stack_tid(), msg->result);
}
-
- msg->args[MSG_ARG_0].p = conn_table;
- msg->args[MSG_ARG_1].u = max_conn;
-
- return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_connnum(rpc_queue *queue)
+static void callback_close(struct rpc_msg *msg)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_get_connnum);
- if (msg == NULL) {
- return -1;
+ int fd = msg->args[MSG_ARG_0].i;
+ struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
+ struct lwip_sock *sock = lwip_get_socket(fd);
+
+ if (sock && __atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) > 0) {
+ msg->recall_flag = 1;
+ rpc_call(&stack->rpc_queue, msg); /* until stack_send recall finish */
+ return;
}
- return rpc_sync_call(queue, msg);
+ msg->result = lwip_close(fd);
+ if (msg->result != 0) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
+ }
}
-int32_t rpc_call_shadow_fd(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, socklen_t addrlen)
+static void callback_shutdown(struct rpc_msg *msg)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_create_shadow_fd);
- if (msg == NULL) {
- return -1;
+ int fd = msg->args[MSG_ARG_0].i;
+ int how = msg->args[MSG_ARG_1].i;
+ struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
+ struct lwip_sock *sock = lwip_get_socket(fd);
+
+ if (sock && __atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) > 0) {
+ msg->recall_flag = 1;
+ rpc_call(&stack->rpc_queue, msg);
+ return;
}
- msg->args[MSG_ARG_0].i = fd;
- msg->args[MSG_ARG_1].cp = addr;
- msg->args[MSG_ARG_2].socklen = addrlen;
+ msg->result = lwip_shutdown(fd, how);
+ if (msg->result != 0 && errno != ENOTCONN) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), fd, msg->result);
+ }
- return rpc_sync_call(queue, msg);
+ posix_api->shutdown_fn(fd, how);
}
-int32_t rpc_call_thread_regphase1(rpc_queue *queue, void *conn)
+static void callback_bind(struct rpc_msg *msg)
{
- struct rpc_msg *msg = rpc_msg_alloc(thread_register_phase1);
- if (msg == NULL) {
- return -1;
+ msg->result = lwip_bind(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].cp, msg->args[MSG_ARG_2].socklen);
+ if (msg->result != 0) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
}
- msg->args[MSG_ARG_0].p = conn;
- return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_thread_regphase2(rpc_queue *queue, void *conn)
+static void callback_listen(struct rpc_msg *msg)
{
- struct rpc_msg *msg = rpc_msg_alloc(thread_register_phase2);
- if (msg == NULL) {
- return -1;
+ int fd = msg->args[MSG_ARG_0].i;
+ int backlog = msg->args[MSG_ARG_1].i;
+
+ struct lwip_sock *sock = lwip_get_socket(fd);
+ if (sock == NULL) {
+ msg->result = -1;
+ return;
+ }
+
+ /* new listen add to stack listen list */
+ msg->result = lwip_listen(fd, backlog);
+ if (msg->result != 0) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
}
- msg->args[MSG_ARG_0].p = conn;
- return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_mbufpoolsize(rpc_queue *queue)
+static void callback_create_shadow_fd(struct rpc_msg *msg)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_mempool_size);
- if (msg == NULL) {
- return -1;
+ int fd = msg->args[MSG_ARG_0].i;
+ struct sockaddr *addr = msg->args[MSG_ARG_1].p;
+ socklen_t addr_len = msg->args[MSG_ARG_2].socklen;
+
+ int clone_fd = 0;
+ struct lwip_sock *sock = lwip_get_socket(fd);
+ if (sock == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "get sock null fd=%d\n", fd);
+ msg->result = -1;
+ return;
}
- return rpc_sync_call(queue, msg);
-}
+ int domain = addr->sa_family;
+ int type = NETCONN_IS_UDP(sock) ? SOCK_DGRAM : SOCK_STREAM;
+ clone_fd = lwip_socket(domain, type, 0);
+ if (clone_fd < 0) {
+ LSTACK_LOG(ERR, LSTACK, "clone socket failed clone_fd=%d errno=%d\n", clone_fd, errno);
+ msg->result = clone_fd;
+ return;
+ }
-int32_t rpc_call_recvlistcnt(rpc_queue *queue)
-{
- struct rpc_msg *msg = rpc_msg_alloc(stack_recvlist_count);
- if (msg == NULL) {
- return -1;
+ struct lwip_sock *clone_sock = lwip_get_socket(clone_fd);
+ if (clone_sock == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "get sock null fd=%d clone_fd=%d\n", fd, clone_fd);
+ msg->result = -1;
+ return;
}
- return rpc_sync_call(queue, msg);
+ do_lwip_clone_sockopt(clone_sock, sock);
+
+ while (sock->listen_next) {
+ sock = sock->listen_next;
+ }
+ sock->listen_next = clone_sock;
+
+ int ret = lwip_bind(clone_fd, addr, addr_len);
+ if (ret < 0) {
+ LSTACK_LOG(ERR, LSTACK, "clone bind failed clone_fd=%d errno=%d\n", clone_fd, errno);
+ msg->result = ret;
+ return;
+ }
+
+ msg->result = clone_fd;
}
-int32_t rpc_call_arp(rpc_queue *queue, void *mbuf)
+static void callback_accept(struct rpc_msg *msg)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_arp);
- if (msg == NULL) {
- return -1;
+ int fd = msg->args[MSG_ARG_0].i;
+ msg->result = -1;
+ struct protocol_stack *stack = get_protocol_stack();
+
+ int accept_fd = lwip_accept4(fd, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].p, msg->args[MSG_ARG_3].i);
+ if (accept_fd < 0) {
+ stack->stats.accept_fail++;
+ LSTACK_LOG(ERR, LSTACK, "fd %d ret %d\n", fd, accept_fd);
+ return;
}
- msg->sync_flag = 0;
- msg->args[MSG_ARG_0].p = mbuf;
+ struct lwip_sock *sock = lwip_get_socket(accept_fd);
+ if (sock == NULL || sock->stack == NULL) {
+ lwip_close(accept_fd);
+ LSTACK_LOG(ERR, LSTACK, "fd %d ret %d\n", fd, accept_fd);
+ return;
+ }
- rpc_call(queue, msg);
+ msg->result = accept_fd;
+ sock->stack->conn_num++;
+ if (rte_ring_count(sock->conn->recvmbox->ring)) {
+ do_lwip_add_recvlist(accept_fd);
+ }
+}
- return 0;
+static void callback_connect(struct rpc_msg *msg)
+{
+ msg->result = lwip_connect(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].socklen);
+ if (msg->result < 0) {
+ msg->result = -errno;
+ }
}
-int32_t rpc_call_socket(rpc_queue *queue, int32_t domain, int32_t type, int32_t protocol)
+int rpc_call_socket(rpc_queue *queue, int domain, int type, int protocol)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_socket);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_socket);
if (msg == NULL) {
return -1;
}
@@ -255,9 +357,9 @@ int32_t rpc_call_socket(rpc_queue *queue, int32_t domain, int32_t type, int32_t
return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_close(rpc_queue *queue, int fd)
+int rpc_call_close(rpc_queue *queue, int fd)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_close);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_close);
if (msg == NULL) {
return -1;
}
@@ -267,20 +369,9 @@ int32_t rpc_call_close(rpc_queue *queue, int fd)
return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_stack_exit(rpc_queue *queue)
+int rpc_call_shutdown(rpc_queue *queue, int fd, int how)
{
- struct rpc_msg *msg = rpc_msg_alloc_except(stack_exit_by_rpc);
- if (msg == NULL) {
- return -1;
- }
-
- rpc_call(queue, msg);
- return 0;
-}
-
-int32_t rpc_call_shutdown(rpc_queue *queue, int fd, int how)
-{
- struct rpc_msg *msg = rpc_msg_alloc(stack_shutdown);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_shutdown);
if (msg == NULL) {
return -1;
}
@@ -291,48 +382,50 @@ int32_t rpc_call_shutdown(rpc_queue *queue, int fd, int how)
return rpc_sync_call(queue, msg);
}
-void rpc_call_clean_epoll(rpc_queue *queue, void *wakeup)
+int rpc_call_bind(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_clean_epoll);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_bind);
if (msg == NULL) {
- return;
+ return -1;
}
- msg->args[MSG_ARG_0].p = wakeup;
+ msg->args[MSG_ARG_0].i = fd;
+ msg->args[MSG_ARG_1].cp = addr;
+ msg->args[MSG_ARG_2].socklen = addrlen;
- rpc_sync_call(queue, msg);
+ return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_bind(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, socklen_t addrlen)
+int rpc_call_listen(rpc_queue *queue, int s, int backlog)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_bind);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_listen);
if (msg == NULL) {
return -1;
}
- msg->args[MSG_ARG_0].i = fd;
- msg->args[MSG_ARG_1].cp = addr;
- msg->args[MSG_ARG_2].socklen = addrlen;
+ msg->args[MSG_ARG_0].i = s;
+ msg->args[MSG_ARG_1].i = backlog;
return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_listen(rpc_queue *queue, int s, int backlog)
+int rpc_call_shadow_fd(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_listen);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_create_shadow_fd);
if (msg == NULL) {
return -1;
}
- msg->args[MSG_ARG_0].i = s;
- msg->args[MSG_ARG_1].i = backlog;
+ msg->args[MSG_ARG_0].i = fd;
+ msg->args[MSG_ARG_1].cp = addr;
+ msg->args[MSG_ARG_2].socklen = addrlen;
return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags)
+int rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_accept);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_accept);
if (msg == NULL) {
return -1;
}
@@ -345,9 +438,9 @@ int32_t rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen
return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen)
+int rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_connect);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_connect);
if (msg == NULL) {
return -1;
}
@@ -356,7 +449,7 @@ int32_t rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr,
msg->args[MSG_ARG_1].cp = addr;
msg->args[MSG_ARG_2].socklen = addrlen;
- int32_t ret = rpc_sync_call(queue, msg);
+ int ret = rpc_sync_call(queue, msg);
if (ret < 0) {
errno = -ret;
return -1;
@@ -364,9 +457,45 @@ int32_t rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr,
return ret;
}
-int32_t rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen)
+static void callback_getpeername(struct rpc_msg *msg)
+{
+ msg->result = lwip_getpeername(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].p);
+ if (msg->result != 0) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
+ }
+}
+
+static void callback_getsockname(struct rpc_msg *msg)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_getpeername);
+ msg->result = lwip_getsockname(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].p);
+ if (msg->result != 0) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
+ }
+}
+
+static void callback_getsockopt(struct rpc_msg *msg)
+{
+ msg->result = lwip_getsockopt(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i,
+ msg->args[MSG_ARG_3].p, msg->args[MSG_ARG_4].p);
+ if (msg->result != 0) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d, level %d, optname %d, fail %ld\n", get_stack_tid(),
+ msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i, msg->result);
+ }
+}
+
+static void callback_setsockopt(struct rpc_msg *msg)
+{
+ msg->result = lwip_setsockopt(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i,
+ msg->args[MSG_ARG_3].cp, msg->args[MSG_ARG_4].socklen);
+ if (msg->result != 0) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d, level %d, optname %d, fail %ld\n", get_stack_tid(),
+ msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i, msg->result);
+ }
+}
+
+int rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(callback_getpeername);
if (msg == NULL) {
return -1;
}
@@ -378,9 +507,9 @@ int32_t rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, so
return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen)
+int rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_getsockname);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_getsockname);
if (msg == NULL) {
return -1;
}
@@ -392,9 +521,9 @@ int32_t rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, so
return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, void *optval, socklen_t *optlen)
+int rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, void *optval, socklen_t *optlen)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_getsockopt);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_getsockopt);
if (msg == NULL) {
return -1;
}
@@ -408,9 +537,9 @@ int32_t rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, vo
return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const void *optval, socklen_t optlen)
+int rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const void *optval, socklen_t optlen)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_setsockopt);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_setsockopt);
if (msg == NULL) {
return -1;
}
@@ -424,51 +553,91 @@ int32_t rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, co
return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_fcntl(rpc_queue *queue, int fd, int cmd, long val)
+static void callback_tcp_send(struct rpc_msg *msg)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_fcntl);
- if (msg == NULL) {
- return -1;
+ int fd = msg->args[MSG_ARG_0].i;
+ size_t len = msg->args[MSG_ARG_1].size;
+ struct protocol_stack *stack = get_protocol_stack();
+ int replenish_again;
+
+ struct lwip_sock *sock = lwip_get_socket(fd);
+ if (POSIX_IS_CLOSED(sock)) {
+ msg->result = -1;
+ return;
}
- msg->args[MSG_ARG_0].i = fd;
- msg->args[MSG_ARG_1].i = cmd;
- msg->args[MSG_ARG_2].l = val;
+ if (get_protocol_stack_group()->latency_start) {
+ calculate_sock_latency(&stack->latency, sock, GAZELLE_LATENCY_WRITE_RPC_MSG);
+ }
- return rpc_sync_call(queue, msg);
+ replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0);
+ if (replenish_again < 0) {
+ __sync_fetch_and_sub(&sock->call_num, 1);
+ return;
+ }
+
+ if (NETCONN_IS_DATAOUT(sock) || replenish_again > 0) {
+ if (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1) {
+ msg->recall_flag = 1;
+ rpc_call(&stack->rpc_queue, msg);
+ return;
+ }
+ }
+
+ __sync_fetch_and_sub(&sock->call_num, 1);
+ return;
}
-int32_t rpc_call_ioctl(rpc_queue *queue, int fd, long cmd, void *argp)
+static void callback_udp_send(struct rpc_msg *msg)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_ioctl);
- if (msg == NULL) {
- return -1;
+ int fd = msg->args[MSG_ARG_0].i;
+ size_t len = msg->args[MSG_ARG_1].size;
+ struct protocol_stack *stack = get_protocol_stack();
+ int replenish_again;
+
+ struct lwip_sock *sock = lwip_get_socket(fd);
+ if (POSIX_IS_CLOSED(sock)) {
+ msg->result = -1;
+ return;
}
- msg->args[MSG_ARG_0].i = fd;
- msg->args[MSG_ARG_1].l = cmd;
- msg->args[MSG_ARG_2].p = argp;
+ if (get_protocol_stack_group()->latency_start) {
+ calculate_sock_latency(&stack->latency, sock, GAZELLE_LATENCY_WRITE_RPC_MSG);
+ }
- return rpc_sync_call(queue, msg);
+ replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0);
+ if ((replenish_again > 0) && (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1)) {
+ rpc_call_replenish(&stack->rpc_queue, sock);
+ return;
+ }
+
+ __sync_fetch_and_sub(&sock->call_num, 1);
+ return;
}
-int32_t rpc_call_replenish(rpc_queue *queue, void *sock)
+int rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_replenish_sendring);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_udp_send);
if (msg == NULL) {
return -1;
}
- msg->args[MSG_ARG_0].p = sock;
+ if (get_protocol_stack_group()->latency_start) {
+ time_stamp_into_rpcmsg(lwip_get_socket(fd));
+ }
+
+ msg->args[MSG_ARG_0].i = fd;
+ msg->args[MSG_ARG_1].size = len;
+ msg->args[MSG_ARG_2].i = flags;
msg->sync_flag = 0;
rpc_call(queue, msg);
return 0;
}
-int32_t rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags)
+int rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_tcp_send);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_tcp_send);
if (msg == NULL) {
return -1;
}
@@ -483,28 +652,173 @@ int32_t rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags)
msg->sync_flag = 0;
rpc_call(queue, msg);
+ return 0;
+}
+
+static void callback_replenish_sendring(struct rpc_msg *msg)
+{
+ struct protocol_stack *stack = get_protocol_stack();
+ struct lwip_sock *sock = (struct lwip_sock *)msg->args[MSG_ARG_0].p;
+
+ msg->result = do_lwip_replenish_sendring(stack, sock);
+ if (msg->result == true) {
+ msg->recall_flag = 1;
+ rpc_call(&stack->rpc_queue, msg);
+ }
+}
+
+int rpc_call_replenish(rpc_queue *queue, void *sock)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(callback_replenish_sendring);
+ if (msg == NULL) {
+ return -1;
+ }
+ msg->args[MSG_ARG_0].p = sock;
+ msg->sync_flag = 0;
+
+ rpc_call(queue, msg);
return 0;
}
-int32_t rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags)
+static void callback_recvlist_count(struct rpc_msg *msg)
+{
+ struct protocol_stack *stack = get_protocol_stack();
+ struct list_node *list = &stack->recv_list;
+ int count = 0;
+ struct list_node *node;
+ struct list_node *temp;
+
+ list_for_each_node(node, temp, list) {
+ count++;
+ }
+ msg->result = count;
+}
+
+int rpc_call_recvlistcnt(rpc_queue *queue)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_udp_send);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_recvlist_count);
if (msg == NULL) {
return -1;
}
- if (get_protocol_stack_group()->latency_start) {
- time_stamp_into_rpcmsg(lwip_get_socket(fd));
+ return rpc_sync_call(queue, msg);
+}
+
+static void callback_clean_epoll(struct rpc_msg *msg)
+{
+ struct protocol_stack *stack = get_protocol_stack();
+ struct wakeup_poll *wakeup = (struct wakeup_poll *)msg->args[MSG_ARG_0].p;
+
+ list_del_node(&wakeup->wakeup_list[stack->stack_idx]);
+}
+
+void rpc_call_clean_epoll(rpc_queue *queue, void *wakeup)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(callback_clean_epoll);
+ if (msg == NULL) {
+ return;
+ }
+
+ msg->args[MSG_ARG_0].p = wakeup;
+
+ rpc_sync_call(queue, msg);
+}
+
+static void callback_arp(struct rpc_msg *msg)
+{
+ struct rte_mbuf *mbuf = (struct rte_mbuf *)msg->args[MSG_ARG_0].p;
+ struct protocol_stack *stack = get_protocol_stack();
+
+ eth_dev_recv(mbuf, stack);
+}
+
+int rpc_call_arp(rpc_queue *queue, void *mbuf)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(callback_arp);
+ if (msg == NULL) {
+ return -1;
}
- msg->args[MSG_ARG_0].i = fd;
- msg->args[MSG_ARG_1].size = len;
- msg->args[MSG_ARG_2].i = flags;
msg->sync_flag = 0;
+ msg->args[MSG_ARG_0].p = mbuf;
rpc_call(queue, msg);
return 0;
}
+static void callback_mempool_size(struct rpc_msg *msg)
+{
+ struct protocol_stack *stack = get_protocol_stack();
+
+ msg->result = rte_mempool_avail_count(stack->rxtx_mbuf_pool);
+}
+
+static void callback_get_conntable(struct rpc_msg *msg)
+{
+ struct gazelle_stat_lstack_conn_info *conn = (struct gazelle_stat_lstack_conn_info *)msg->args[MSG_ARG_0].p;
+ unsigned max_num = msg->args[MSG_ARG_1].u;
+
+ msg->result = do_lwip_get_conntable(conn, max_num);
+}
+
+static void callback_get_connnum(struct rpc_msg *msg)
+{
+ msg->result = do_lwip_get_connnum();
+}
+
+int rpc_call_conntable(rpc_queue *queue, void *conn_table, unsigned max_conn)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(callback_get_conntable);
+ if (msg == NULL) {
+ return -1;
+ }
+
+ msg->args[MSG_ARG_0].p = conn_table;
+ msg->args[MSG_ARG_1].u = max_conn;
+
+ return rpc_sync_call(queue, msg);
+}
+
+int rpc_call_connnum(rpc_queue *queue)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(callback_get_connnum);
+ if (msg == NULL) {
+ return -1;
+ }
+
+ return rpc_sync_call(queue, msg);
+}
+
+int rpc_call_mbufpoolsize(rpc_queue *queue)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(callback_mempool_size);
+ if (msg == NULL) {
+ return -1;
+ }
+
+ return rpc_sync_call(queue, msg);
+}
+
+extern void thread_register_phase1(struct rpc_msg *msg);
+int rpc_call_thread_regphase1(rpc_queue *queue, void *conn)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(thread_register_phase1);
+ if (msg == NULL) {
+ return -1;
+ }
+ msg->args[MSG_ARG_0].p = conn;
+ return rpc_sync_call(queue, msg);
+}
+
+extern void thread_register_phase2(struct rpc_msg *msg);
+int rpc_call_thread_regphase2(rpc_queue *queue, void *conn)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(thread_register_phase2);
+ if (msg == NULL) {
+ return -1;
+ }
+ msg->args[MSG_ARG_0].p = conn;
+ return rpc_sync_call(queue, msg);
+}
diff --git a/src/lstack/include/lstack_dpdk.h b/src/lstack/include/lstack_dpdk.h
index d058409..965a0cb 100644
--- a/src/lstack/include/lstack_dpdk.h
+++ b/src/lstack/include/lstack_dpdk.h
@@ -13,7 +13,10 @@
#ifndef _GAZELLE_DPDK_H_
#define _GAZELLE_DPDK_H_
-#include <lwip/lwipgz_flow.h>
+#include <rte_ring.h>
+#include <rte_mbuf.h>
+#include <rte_mempool.h>
+
#include "common/gazelle_opt.h"
#include "common/gazelle_dfx_msg.h"
@@ -32,32 +35,34 @@
*/
#define MBUF_MAX_NUM 0xfffffff
+struct protocol_stack;
+
+int32_t dpdk_eal_init(void);
+void lstack_log_level_init(void);
+
+int dpdk_ethdev_init(int port_id);
+int dpdk_ethdev_start(void);
+int init_dpdk_ethdev(void);
+
int thread_affinity_default(void);
int thread_affinity_init(int cpu_id);
-struct protocol_stack;
-struct rte_mempool;
-struct rte_ring;
-struct rte_mbuf;
+int32_t create_shared_ring(struct protocol_stack *stack);
int32_t fill_mbuf_to_ring(struct rte_mempool *mempool, struct rte_ring *ring, uint32_t mbuf_num);
-int32_t dpdk_eal_init(void);
int32_t pktmbuf_pool_init(struct protocol_stack *stack);
struct rte_mempool *create_mempool(const char *name, uint32_t count, uint32_t size,
uint32_t flags, int32_t idx);
-int32_t create_shared_ring(struct protocol_stack *stack);
-void lstack_log_level_init(void);
-int dpdk_ethdev_init(int port_id);
-int dpdk_ethdev_start(void);
+struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_mbuf,
+ uint32_t mbuf_cache_size, uint16_t queue_id, unsigned numa_id);
+int32_t dpdk_alloc_pktmbuf(struct rte_mempool *pool, struct rte_mbuf **mbufs, uint32_t num, bool reserve);
+
#if RTE_VERSION < RTE_VERSION_NUM(23, 11, 0, 0)
void dpdk_skip_nic_init(void);
void dpdk_restore_pci(void);
#endif
int32_t dpdk_init_lstack_kni(void);
-bool port_in_stack_queue(gz_addr_t *src_ip, gz_addr_t *dst_ip, uint16_t src_port, uint16_t dst_port);
-struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_mbuf,
- uint32_t mbuf_cache_size, uint16_t queue_id, unsigned numa_id);
void dpdk_nic_xstats_get(struct gazelle_stack_dfx_data *dfx, uint16_t port_id);
-int32_t dpdk_alloc_pktmbuf(struct rte_mempool *pool, struct rte_mbuf **mbufs, uint32_t num, bool reserve);
void dpdk_nic_features_get(struct gazelle_stack_dfx_data *dfx, uint16_t port_id);
+
#endif /* GAZELLE_DPDK_H */
diff --git a/src/lstack/include/lstack_epoll.h b/src/lstack/include/lstack_epoll.h
index 6e02615..e7ae26b 100644
--- a/src/lstack/include/lstack_epoll.h
+++ b/src/lstack/include/lstack_epoll.h
@@ -19,14 +19,11 @@
#include <pthread.h>
#include <lwip/lwipgz_list.h>
+#include <lwip/lwipgz_sock.h>
#include "common/gazelle_dfx_msg.h"
#include "common/gazelle_opt.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
enum wakeup_type {
WAKEUP_EPOLL = 0,
WAKEUP_POLL,
@@ -61,9 +58,6 @@ struct wakeup_poll {
pthread_spinlock_t event_list_lock;
};
-struct netconn;
-struct lwip_sock;
-
void add_sock_event(struct lwip_sock *sock, uint32_t event);
void add_sock_event_nolock(struct lwip_sock *sock, uint32_t event);
void del_sock_event(struct lwip_sock *sock, uint32_t event);
@@ -91,8 +85,4 @@ static inline void lstack_block_wakeup(struct wakeup_poll *wakeup)
}
}
-#ifdef __cplusplus
-}
-#endif
-
#endif /* _GAZELLE_EPOLL_H_ */
diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h
index b972f11..0c7bb62 100644
--- a/src/lstack/include/lstack_lwip.h
+++ b/src/lstack/include/lstack_lwip.h
@@ -15,8 +15,12 @@
#include <stdbool.h>
#include "common/gazelle_dfx_msg.h"
+#include "common/dpdk_common.h"
struct lwip_sock;
+struct rpc_msg;
+struct protocol_stack;
+
unsigned same_node_ring_count(struct lwip_sock *sock);
#define NETCONN_IS_ACCEPTIN(sock) (((sock)->conn->acceptmbox != NULL) && !sys_mbox_empty((sock)->conn->acceptmbox))
@@ -25,11 +29,6 @@ unsigned same_node_ring_count(struct lwip_sock *sock);
#define NETCONN_IS_OUTIDLE(sock) gazelle_ring_readable_count((sock)->send_ring)
#define NETCONN_IS_UDP(sock) (NETCONNTYPE_GROUP(netconn_type((sock)->conn)) == NETCONN_UDP)
-struct rte_mempool;
-struct rpc_msg;
-struct rte_mbuf;
-struct protocol_stack;
-
void do_lwip_clone_sockopt(struct lwip_sock *dst_sock, struct lwip_sock *src_sock);
struct pbuf *do_lwip_tcp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size);
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index 7dce757..fdd5388 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -17,6 +17,10 @@
#include <sys/epoll.h>
#include <stdbool.h>
+#include <rte_ring.h>
+#include <rte_mbuf.h>
+#include <rte_mempool.h>
+
#include <lwip/lwipgz_list.h>
#include <lwip/netif.h>
@@ -35,10 +39,6 @@
#define MBUFPOOL_RESERVE_NUM (get_global_cfg_params()->nic.rxqueue_size + 1024)
-struct rte_mempool;
-struct rte_ring;
-struct rte_mbuf;
-
struct protocol_stack {
uint32_t tid;
uint16_t queue_id;
@@ -111,50 +111,23 @@ struct protocol_stack_group {
};
long get_stack_tid(void);
+
struct protocol_stack *get_protocol_stack(void);
struct protocol_stack *get_protocol_stack_by_fd(int32_t fd);
struct protocol_stack *get_bind_protocol_stack(void);
struct protocol_stack_group *get_protocol_stack_group(void);
+int get_min_conn_stack(struct protocol_stack_group *stack_group);
+void bind_to_stack_numa(struct protocol_stack *stack);
+void thread_bind_stack(struct protocol_stack *stack);
+
int32_t stack_group_init(void);
void stack_group_exit(void);
+void stack_exit(void);
+
int32_t stack_setup_thread(void);
int32_t stack_setup_app_thread(void);
-void bind_to_stack_numa(struct protocol_stack *stack);
-int32_t init_dpdk_ethdev(void);
-
-void wait_sem_value(sem_t *sem, int32_t wait_value);
-
-/* any protocol stack thread receives arp packet and sync it to other threads so that it can have the arp table */
-void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack);
-
-/* when fd is listenfd, listenfd of all protocol stack thread will be closed */
-int32_t stack_broadcast_close(int32_t fd);
-
-int stack_broadcast_shutdown(int fd, int how);
-
-/* listen sync to all protocol stack thread, so that any protocol stack thread can build connect */
-int32_t stack_broadcast_listen(int32_t fd, int backlog);
-int32_t stack_single_listen(int32_t fd, int32_t backlog);
-
-/* bind sync to all protocol stack thread, only for udp protocol */
-int32_t stack_broadcast_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen);
-int32_t stack_single_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen);
-
-/* ergodic the protocol stack thread to find the connection, because all threads are listening */
-int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *addrlen);
-int32_t stack_broadcast_accept4(int32_t fd, struct sockaddr *addr, socklen_t *addrlen, int32_t flags);
-
-struct wakeup_poll;
-void stack_broadcast_clean_epoll(struct wakeup_poll *wakeup);
-
-void stack_send_pkts(struct protocol_stack *stack);
-
-struct thread_params {
- uint16_t queue_id;
- uint16_t idx;
-};
-
int stack_polling(uint32_t wakeup_tick);
+
#endif
diff --git a/src/lstack/include/lstack_rpc_proc.h b/src/lstack/include/lstack_rpc_proc.h
deleted file mode 100644
index 77b18bd..0000000
--- a/src/lstack/include/lstack_rpc_proc.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
-* Copyright (c) Huawei Technologies Co., Ltd. 2020-2021. All rights reserved.
-* gazelle is licensed under the Mulan PSL v2.
-* You can use this software according to the terms and conditions of the Mulan PSL v2.
-* You may obtain a copy of Mulan PSL v2 at:
-* http://license.coscl.org.cn/MulanPSL2
-* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
-* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
-* PURPOSE.
-* See the Mulan PSL v2 for more details.
-*/
-
-#ifndef __GAZELLE_RPC_PROC_H__
-#define __GAZELLE_RPC_PROC_H__
-#include "lstack_thread_rpc.h"
-
-void stack_clean_epoll(struct rpc_msg *msg);
-void stack_arp(struct rpc_msg *msg);
-void stack_socket(struct rpc_msg *msg);
-void stack_close(struct rpc_msg *msg);
-void stack_shutdown(struct rpc_msg *msg);
-void stack_bind(struct rpc_msg *msg);
-void stack_listen(struct rpc_msg *msg);
-void stack_accept(struct rpc_msg *msg);
-void stack_connect(struct rpc_msg *msg);
-void stack_recv(struct rpc_msg *msg);
-void stack_getpeername(struct rpc_msg *msg);
-void stack_getsockname(struct rpc_msg *msg);
-void stack_getsockopt(struct rpc_msg *msg);
-void stack_setsockopt(struct rpc_msg *msg);
-void stack_fcntl(struct rpc_msg *msg);
-void stack_ioctl(struct rpc_msg *msg);
-void stack_tcp_send(struct rpc_msg *msg);
-void stack_udp_send(struct rpc_msg *msg);
-void stack_mempool_size(struct rpc_msg *msg);
-void stack_rpcpool_size(struct rpc_msg *msg);
-void stack_create_shadow_fd(struct rpc_msg *msg);
-void stack_replenish_sendring(struct rpc_msg *msg);
-void stack_get_conntable(struct rpc_msg *msg);
-void stack_get_connnum(struct rpc_msg *msg);
-void stack_recvlist_count(struct rpc_msg *msg);
-void stack_exit_by_rpc(struct rpc_msg *msg);
-
-void thread_register_phase1(struct rpc_msg *msg);
-void thread_register_phase2(struct rpc_msg *msg);
-
-#endif
diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h
index d268366..c2654bb 100644
--- a/src/lstack/include/lstack_thread_rpc.h
+++ b/src/lstack/include/lstack_thread_rpc.h
@@ -35,8 +35,8 @@ struct rpc_stats {
struct rpc_msg;
typedef void (*rpc_msg_func)(struct rpc_msg *msg);
union rpc_msg_arg {
- int32_t i;
- uint32_t u;
+ int i;
+ unsigned int u;
long l;
unsigned long ul;
void *p;
@@ -63,50 +63,43 @@ static inline void rpc_queue_init(rpc_queue *queue)
{
lockless_queue_init(queue);
}
-
struct rpc_stats *rpc_stats_get(void);
-int32_t rpc_msgcnt(rpc_queue *queue);
-int rpc_poll_msg(rpc_queue *queue, uint32_t max_num);
+int rpc_msgcnt(rpc_queue *queue);
+int rpc_poll_msg(rpc_queue *queue, int max_num);
+
+int rpc_call_stack_exit(rpc_queue *queue);
+
+/* #include <sys/socket.h> will conflict with lwip/sockets.h */
+struct sockaddr;
+
+int rpc_call_close(rpc_queue *queue, int fd);
+int rpc_call_shutdown(rpc_queue *queue, int fd, int how);
+int rpc_call_socket(rpc_queue *queue, int domain, int type, int protocol);
+int rpc_call_bind(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen);
+int rpc_call_listen(rpc_queue *queue, int s, int backlog);
+int rpc_call_shadow_fd(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen);
+int rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags);
+int rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen);
+
+int rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen);
+int rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen);
+int rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, void *optval, socklen_t *optlen);
+int rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const void *optval, socklen_t optlen);
+
+int rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags);
+int rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags);
+
+int rpc_call_replenish(rpc_queue *queue, void *sock);
+int rpc_call_recvlistcnt(rpc_queue *queue);
+
void rpc_call_clean_epoll(rpc_queue *queue, void *wakeup);
-int32_t rpc_call_shadow_fd(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, socklen_t addrlen);
-int32_t rpc_call_recvlistcnt(rpc_queue *queue);
-int32_t rpc_call_thread_regphase1(rpc_queue *queue, void *conn);
-int32_t rpc_call_thread_regphase2(rpc_queue *queue, void *conn);
-int32_t rpc_call_conntable(rpc_queue *queue, void *conn_table, uint32_t max_conn);
-int32_t rpc_call_connnum(rpc_queue *queue);
-int32_t rpc_call_arp(rpc_queue *queue, void *mbuf);
-int32_t rpc_call_socket(rpc_queue *queue, int32_t domain, int32_t type, int32_t protocol);
-int32_t rpc_call_close(rpc_queue *queue, int32_t fd);
-int32_t rpc_call_shutdown(rpc_queue *queue, int fd, int how);
-int32_t rpc_call_bind(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, socklen_t addrlen);
-int32_t rpc_call_listen(rpc_queue *queue, int s, int backlog);
-int32_t rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags);
-int32_t rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen);
-int32_t rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags);
-int32_t rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags);
-int32_t rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen);
-int32_t rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen);
-int32_t rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, void *optval, socklen_t *optlen);
-int32_t rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const void *optval, socklen_t optlen);
-int32_t rpc_call_fcntl(rpc_queue *queue, int fd, int cmd, long val);
-int32_t rpc_call_ioctl(rpc_queue *queue, int fd, long cmd, void *argp);
-int32_t rpc_call_replenish(rpc_queue *queue, void *sock);
-int32_t rpc_call_mbufpoolsize(rpc_queue *queue);
-int32_t rpc_call_stack_exit(rpc_queue *queue);
-
-static inline __attribute__((always_inline)) void rpc_call(rpc_queue *queue, struct rpc_msg *msg)
-{
- lockless_queue_mpsc_push(queue, &msg->queue_node);
-}
+int rpc_call_arp(rpc_queue *queue, void *mbuf);
-static inline __attribute__((always_inline)) void rpc_msg_free(struct rpc_msg *msg)
-{
- pthread_spin_destroy(&msg->lock);
- if (msg->rpcpool != NULL && msg->rpcpool->mempool != NULL) {
- rte_mempool_put(msg->rpcpool->mempool, (void *)msg);
- } else {
- free(msg);
- }
-}
+int rpc_call_conntable(rpc_queue *queue, void *conn_table, unsigned max_conn);
+int rpc_call_connnum(rpc_queue *queue);
+int rpc_call_mbufpoolsize(rpc_queue *queue);
+
+int rpc_call_thread_regphase1(rpc_queue *queue, void *conn);
+int rpc_call_thread_regphase2(rpc_queue *queue, void *conn);
#endif
diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c
index cf66e15..4f3cbc1 100644
--- a/src/lstack/netif/lstack_ethdev.c
+++ b/src/lstack/netif/lstack_ethdev.c
@@ -43,6 +43,62 @@
#define MBUF_MAX_LEN 1514
#define PACKET_READ_SIZE 32
+/* any protocol stack thread receives arp packet and sync it to other threads,
+ * so that it can have the arp table */
+static void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack)
+{
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ struct rte_mbuf *mbuf_copy = NULL;
+ struct protocol_stack *stack = NULL;
+ int32_t ret;
+
+ for (int32_t i = 0; i < stack_group->stack_num; i++) {
+ stack = stack_group->stacks[i];
+ if (cur_stack == stack) {
+ continue;
+ }
+
+ /* stack maybe not init in app thread yet */
+ if (stack == NULL || !(netif_is_up(&stack->netif))) {
+ continue;
+ }
+
+ ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, true);
+ if (ret != 0) {
+ stack->stats.rx_allocmbuf_fail++;
+ return;
+ }
+ copy_mbuf(mbuf_copy, mbuf);
+
+ ret = rpc_call_arp(&stack->rpc_queue, mbuf_copy);
+ if (ret != 0) {
+ rte_pktmbuf_free(mbuf_copy);
+ return;
+ }
+ }
+#if RTE_VERSION < RTE_VERSION_NUM(23, 11, 0, 0)
+ if (get_global_cfg_params()->kni_switch) {
+ ret = dpdk_alloc_pktmbuf(cur_stack->rxtx_mbuf_pool, &mbuf_copy, 1, true);
+ if (ret != 0) {
+ cur_stack->stats.rx_allocmbuf_fail++;
+ return;
+ }
+ copy_mbuf(mbuf_copy, mbuf);
+ kni_handle_tx(mbuf_copy);
+ }
+#endif
+ if (get_global_cfg_params()->flow_bifurcation) {
+ ret = dpdk_alloc_pktmbuf(cur_stack->rxtx_mbuf_pool, &mbuf_copy, 1, true);
+ if (ret != 0) {
+ cur_stack->stats.rx_allocmbuf_fail++;
+ return;
+ }
+ copy_mbuf(mbuf_copy, mbuf);
+ virtio_tap_process_tx(cur_stack->queue_id, mbuf_copy);
+ }
+ return;
+}
+
void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack)
{
int32_t ret;
--
2.33.0
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/together17/gazelle.git
git@gitee.com:together17/gazelle.git
together17
gazelle
gazelle
master

搜索帮助

0d507c66 1850385 C8b1a773 1850385