1 Star 0 Fork 32

misaka00251/gazelle

forked from src-openEuler/gazelle 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
0038-refactor-event.patch 107.16 KB
一键复制 编辑 原始数据 按行查看 历史
jinag12 提交于 2022-03-29 23:46 . refactor event
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098
From 9b4379914e97d4c0c267033559bf86d20c7381b6 Mon Sep 17 00:00:00 2001
From: jiangheng <jiangheng12@huawei.com>
Date: Wed, 30 Mar 2022 00:04:46 +0800
Subject: [PATCH] refactor event
---
README.md | 3 +-
src/common/gazelle_dfx_msg.h | 18 +-
src/lstack/api/lstack_epoll.c | 509 +++++++++++++++--------------
src/lstack/api/lstack_signal.c | 1 +
src/lstack/core/lstack_cfg.c | 27 +-
src/lstack/core/lstack_dpdk.c | 99 ++++--
src/lstack/core/lstack_init.c | 13 +-
src/lstack/core/lstack_lwip.c | 352 +++++++++++---------
src/lstack/core/lstack_protocol_stack.c | 345 +++++++++----------
src/lstack/core/lstack_stack_stat.c | 6 -
src/lstack/core/lstack_thread_rpc.c | 106 +++---
src/lstack/include/lstack_cfg.h | 2 +-
src/lstack/include/lstack_dpdk.h | 7 +-
src/lstack/include/lstack_log.h | 9 +-
src/lstack/include/lstack_lwip.h | 11 +-
src/lstack/include/lstack_protocol_stack.h | 35 +-
src/lstack/include/lstack_thread_rpc.h | 5 +-
src/lstack/include/lstack_vdev.h | 2 +-
src/lstack/include/posix/lstack_epoll.h | 2 +
src/lstack/lstack.conf | 1 -
src/lstack/netif/lstack_ethdev.c | 25 +-
src/lstack/netif/lstack_vdev.c | 2 +-
src/ltran/ltran_dfx.c | 31 +-
src/ltran/ltran_opt.h | 6 +-
24 files changed, 822 insertions(+), 795 deletions(-)
diff --git a/README.md b/README.md
index e914b26..24079c7 100644
--- a/README.md
+++ b/README.md
@@ -236,7 +236,7 @@ Usage: gazellectl [-h | help]
- 提供的命令行、配置文件以及配置大页内存需要root权限执行或修改。非root用户使用,需先提权以及修改文件权限。
- 若要把用户态网卡绑回内核驱动,必须先将Gazelle退出。
- 不支持accept阻塞模式或者connect阻塞模式。
-- 最多只支持20000个链接(需要保证进程内,非网络连接的fd个数小于2000个)。
+- 最多只支持1500个连接。
- 协议栈当前只支持tcp、icmp、arp、ipv4。
- 大页内存不支持在挂载点里创建子目录重新挂载。
- 在对端ping时,要求指定报文长度小于等于14000。
@@ -253,6 +253,7 @@ Usage: gazellectl [-h | help]
- 不使用ltran模式,kni网口只支持本地通讯使用,且需要启动前配置NetworkManager不管理kni网卡
- 虚拟kni网口的ip及mac地址,需要与lstack配置文件保持一致
- gazelle运行过程中,不允许删除运行文件,如果删除,需要重启gazelle
+- lstack配置的ip需要与应用程序的ip保持一致
## Security risk note
gazelle有如下安全风险,用户需要评估使用场景风险
diff --git a/src/common/gazelle_dfx_msg.h b/src/common/gazelle_dfx_msg.h
index de669f5..6db67ee 100644
--- a/src/common/gazelle_dfx_msg.h
+++ b/src/common/gazelle_dfx_msg.h
@@ -65,11 +65,9 @@ struct gazelle_stat_pkts {
uint64_t rx_allocmbuf_fail;
uint64_t tx_allocmbuf_fail;
uint64_t call_msg_cnt;
- uint16_t weakup_ring_cnt;
uint16_t conn_num;
uint16_t send_idle_ring_cnt;
uint64_t event_list;
- uint64_t wakeup_list;
uint64_t read_lwip_drop;
uint64_t read_lwip_cnt;
uint64_t write_lwip_drop;
@@ -79,22 +77,13 @@ struct gazelle_stat_pkts {
uint64_t app_write_idlefail;
uint64_t app_write_drop;
uint64_t recv_list;
- uint64_t lwip_events;
- uint64_t weakup_events;
+ uint64_t wakeup_events;
uint64_t app_events;
uint64_t call_alloc_fail;
- uint64_t read_events;
- uint64_t write_events;
- uint64_t accept_events;
uint64_t read_null;
- uint64_t remove_event;
- uint64_t send_self_rpc;
uint64_t call_null;
uint64_t arp_copy_fail;
- uint64_t epoll_pending;
- uint64_t epoll_pending_call;
- uint64_t epoll_self_call;
- uint64_t epoll_self_event;
+ uint64_t send_self_rpc;
uint64_t send_list;
};
@@ -169,8 +158,7 @@ struct gazelle_stat_lstack_conn_info {
uint32_t send_ring_cnt;
uint32_t recv_ring_cnt;
uint32_t tcp_sub_state;
- uint32_t event_ring_cnt;
- uint32_t self_ring_cnt;
+ int32_t sem_cnt;
};
struct gazelle_stat_lstack_conn {
diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c
index e54d496..b8d53f6 100644
--- a/src/lstack/api/lstack_epoll.c
+++ b/src/lstack/api/lstack_epoll.c
@@ -14,6 +14,7 @@
#include <securec.h>
#include <sys/epoll.h>
#include <time.h>
+#include <poll.h>
#include <lwip/lwipsock.h>
#include <lwip/sockets.h>
@@ -21,6 +22,7 @@
#include <lwip/api.h>
#include <lwip/tcp.h>
#include <lwip/timeouts.h>
+#include <lwip/posix_api.h>
#include "lstack_compiler.h"
#include "lstack_ethdev.h"
@@ -28,127 +30,103 @@
#include "lstack_cfg.h"
#include "lstack_log.h"
#include "gazelle_base_func.h"
-#include "lstack_weakup.h"
#include "lstack_lwip.h"
#include "lstack_protocol_stack.h"
-#define EPOLL_INTERVAL_10MS 10000000
+#define EPOLL_KERNEL_INTERVAL 10 /* ms */
+#define EPOLL_NSEC_TO_SEC 1000000000
+#define EPOLL_MAX_EVENTS 512
-static PER_THREAD struct weakup_poll g_weakup_poll = {0};
-
-enum POLL_TYPE {
- TYPE_POLL,
- TYPE_EPOLL,
-};
-
-static inline bool check_event_vaild(struct lwip_sock *sock, uint32_t event)
-{
- if ((event & EPOLLIN) && !NETCONN_IS_ACCEPTIN(sock) && !NETCONN_IS_DATAIN(sock)) {
- event &= ~EPOLLIN;
- }
-
- if ((event & EPOLLOUT) && !NETCONN_IS_DATAOUT(sock)) {
- event &= ~EPOLLOUT;
- }
-
- return (event) ? true : false;
-}
-
-static inline bool report_events(struct lwip_sock *sock, uint32_t event)
-{
- /* error event */
- if ((event & EPOLLERR) || (event & EPOLLHUP) || (event & EPOLLRDHUP)) {
- return true;
- }
-
- if (__atomic_load_n(&sock->have_event, __ATOMIC_ACQUIRE)) {
- return false;
- }
-
- return check_event_vaild(sock, event);
-}
+static PER_THREAD struct wakeup_poll g_wakeup_poll = {0};
+static bool g_use_epoll = false; /* FIXME: when no epoll close prepare event for performance testing */
void add_epoll_event(struct netconn *conn, uint32_t event)
{
/* conn sock nerver null, because lwip call this func */
struct lwip_sock *sock = get_socket(conn->socket);
- /* close_wait event should be (EPOLLRDHUP | EPOLLIN), but lwip is EPOLLERR */
- if (event == EPOLLERR && conn->pcb.tcp && conn->pcb.tcp->state == CLOSE_WAIT) {
- event = EPOLLRDHUP | EPOLLIN | EPOLLERR;
- }
-
if ((event & sock->epoll_events) == 0) {
return;
}
+
sock->events |= event & sock->epoll_events;
- if (!sock->weakup || !report_events(sock, event)) {
- return;
+#ifdef GAZELLE_USE_EPOLL_EVENT_STACK
+ if (g_use_epoll && list_is_empty(&sock->event_list)) {
+ list_add_node(&sock->stack->event_list, &sock->event_list);
}
+#endif
- if (weakup_enqueue(sock->stack->weakup_ring, sock)) {
- if (list_is_empty(&sock->event_list)) {
- list_add_node(&sock->stack->event_list, &sock->event_list);
+ if (sock->wakeup) {
+ sock->stack->stats.wakeup_events++;
+ if (get_protocol_stack_group()->wakeup_enable) {
+ rte_ring_sp_enqueue(sock->stack->wakeup_ring, &sock->wakeup->event_sem);
+ } else {
+ sem_post(&sock->wakeup->event_sem);
}
- } else {
- __atomic_store_n(&sock->have_event, true, __ATOMIC_RELEASE);
- sock->stack->stats.weakup_events++;
}
}
-static void raise_pending_events(struct lwip_sock *sock)
+static inline uint32_t update_events(struct lwip_sock *sock)
{
- struct weakup_poll *wakeup = sock->weakup;
- struct protocol_stack *stack = sock->stack;
- struct netconn *conn = sock->conn;
- if (wakeup == NULL || stack == NULL || conn == NULL) {
- return;
+ uint32_t event = 0;
+
+ if (sock->epoll_events & EPOLLIN) {
+ if (sock->attach_fd > 0 && NETCONN_IS_ACCEPTIN(sock)) {
+ event |= EPOLLIN;
+ }
+
+ if (sock->attach_fd < 0 && NETCONN_IS_DATAIN(sock)) {
+ event |= EPOLLIN;
+ }
}
- struct lwip_sock *attach_sock = (sock->attach_fd > 0) ? get_socket_by_fd(sock->attach_fd) : sock;
- if (attach_sock == NULL) {
- return;
+ if ((sock->epoll_events & EPOLLOUT) && NETCONN_IS_OUTIDLE(sock)) {
+ event |= EPOLLOUT;
}
- conn = attach_sock->conn;
- if (conn == NULL) {
- return;
+ if ((sock->epoll_events & EPOLLERR) && (sock->events & EPOLLERR)) {
+ event |= EPOLLERR | EPOLLIN;
}
- struct tcp_pcb *tcp = conn->pcb.tcp;
- if ((tcp == NULL) || (tcp->state < ESTABLISHED)) {
+
+ return event;
+}
+
+#ifdef GAZELLE_USE_EPOLL_EVENT_STACK
+void update_stack_events(struct protocol_stack *stack)
+{
+ if (!g_use_epoll) {
return;
}
- uint32_t event = 0;
- if (sock->epoll_events & EPOLLIN) {
- if (attach_sock->recv_lastdata || rte_ring_count(attach_sock->recv_ring) || NETCONN_IS_ACCEPTIN(attach_sock)) {
- event |= EPOLLIN;
- }
- }
+ struct list_node *node, *temp;
+ list_for_each_safe(node, temp, &stack->event_list) {
+ struct lwip_sock *sock = container_of(node, struct lwip_sock, event_list);
- if (sock->epoll_events & EPOLLOUT) {
- if ((attach_sock->sendevent > 0) ||
- ((tcp_sndbuf(conn->pcb.tcp) > TCP_SNDLOWAT) && (tcp_sndqueuelen(conn->pcb.tcp) < TCP_SNDQUEUELOWAT))) {
- event |= EPOLLOUT;
+ sock->events = update_events(sock);
+ if (sock->events != 0) {
+ continue;
}
- }
- if (attach_sock->errevent > 0) {
- event |= POLLERR | POLLIN;
+ if (pthread_spin_trylock(&stack->event_lock)) {
+ continue;
+ }
+ list_del_node_init(&sock->event_list);
+ pthread_spin_unlock(&stack->event_lock);
}
+}
+#endif
- if (event == 0) {
+static void raise_pending_events(struct lwip_sock *sock)
+{
+ struct lwip_sock *attach_sock = (sock->attach_fd > 0) ? get_socket_by_fd(sock->attach_fd) : sock;
+ if (attach_sock == NULL) {
return;
}
- attach_sock->events |= event;
- if (rte_ring_mp_enqueue(wakeup->event_ring, (void *)sock) == 0 ||
- rte_ring_mp_enqueue(wakeup->self_ring, (void *)sock) == 0) {
- sem_post(&wakeup->event_sem);
- stack->stats.epoll_pending++;
- } else {
- rpc_call_addevent(stack, attach_sock);
- stack->stats.epoll_pending_call++;
+
+ attach_sock->events = update_events(attach_sock);
+ if (attach_sock->events & attach_sock->epoll_events) {
+ rpc_call_addevent(attach_sock->stack, attach_sock);
}
}
@@ -166,34 +144,35 @@ int32_t lstack_epoll_create(int32_t size)
GAZELLE_RETURN(EINVAL);
}
- struct weakup_poll *weakup = malloc(sizeof(struct weakup_poll));
- if (weakup == NULL) {
+ struct wakeup_poll *wakeup = malloc(sizeof(struct wakeup_poll));
+ if (wakeup == NULL) {
posix_api->close_fn(fd);
GAZELLE_RETURN(EINVAL);
}
- memset_s(weakup, sizeof(struct weakup_poll), 0, sizeof(struct weakup_poll));
- sem_init(&weakup->event_sem, 0, 0);
-
- weakup->event_ring = create_ring("RING_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, fd);
- if (weakup->event_ring == NULL) {
- posix_api->close_fn(fd);
- GAZELLE_RETURN(ENOMEM);
- }
-
- weakup->self_ring = create_ring("SELF_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, fd);
- if (weakup->self_ring == NULL) {
- posix_api->close_fn(fd);
- GAZELLE_RETURN(ENOMEM);
- }
+ memset_s(wakeup, sizeof(struct wakeup_poll), 0, sizeof(struct wakeup_poll));
+ sem_init(&wakeup->event_sem, 0, 0);
- sock->weakup = weakup;
+ sock->wakeup = wakeup;
+ init_list_node(&wakeup->event_list);
+ g_use_epoll = true;
return fd;
}
int32_t lstack_epoll_close(int32_t fd)
{
+ struct lwip_sock *sock = get_socket_by_fd(fd);
+ if (sock == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "fd=%d sock is NULL errno=%d\n", fd, errno);
+ GAZELLE_RETURN(EINVAL);
+ }
+
+ if (sock->wakeup) {
+ free(sock->wakeup);
+ }
+ sock->wakeup = NULL;
+
return 0;
}
@@ -219,7 +198,7 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
}
struct lwip_sock *epoll_sock = get_socket_by_fd(epfd);
- if (epoll_sock == NULL || epoll_sock->weakup == NULL) {
+ if (epoll_sock == NULL || epoll_sock->wakeup == NULL) {
LSTACK_LOG(ERR, LSTACK, "epfd=%d\n", fd);
GAZELLE_RETURN(EINVAL);
}
@@ -228,7 +207,10 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
do {
switch (op) {
case EPOLL_CTL_ADD:
- sock->weakup = epoll_sock->weakup;
+ sock->wakeup = epoll_sock->wakeup;
+ if (list_is_empty(&sock->event_list)) {
+ list_add_node(&sock->wakeup->event_list, &sock->event_list);
+ }
/* fall through */
case EPOLL_CTL_MOD:
sock->epoll_events = events;
@@ -238,6 +220,7 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
}
break;
case EPOLL_CTL_DEL:
+ list_del_node_init(&sock->event_list);
sock->epoll_events = 0;
break;
default:
@@ -250,176 +233,234 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
return 0;
}
-static inline int32_t save_poll_event(struct pollfd *fds, uint32_t maxevents, int32_t fd, uint32_t events)
+#ifdef GAZELLE_USE_EPOLL_EVENT_STACK
+static int32_t epoll_lwip_event(struct wakeup_poll *wakeup, struct epoll_event *events, uint32_t maxevents)
{
int32_t event_num = 0;
- for (uint32_t i = 0; i < maxevents; i++) {
- /* fds[i].revents != 0, the events is kernel events */
- if (fds[i].fd == fd && fds[i].revents == 0) {
- fds[i].revents = events;
- event_num = 1;
- break;
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+
+ maxevents = LWIP_MIN(EPOLL_MAX_EVENTS, maxevents);
+ for (uint32_t i = 0; i < stack_group->stack_num && event_num < maxevents; i++) {
+ struct protocol_stack *stack = stack_group->stacks[i];
+ int32_t start_event_num = event_num;
+
+ if (pthread_spin_trylock(&stack->event_lock)) {
+ continue;
+ }
+
+ struct list_node *node, *temp;
+ list_for_each_safe(node, temp, &stack->event_list) {
+ struct lwip_sock *sock = container_of(node, struct lwip_sock, event_list);
+
+ uint32_t event = sock->events & sock->epoll_events;
+ if (event == 0 || sock->wait_close) {
+ continue;
+ }
+
+ events[event_num].events = event;
+ events[event_num].data = sock->ep_data;
+ event_num++;
+
+ if (event_num >= maxevents) {
+ break;
+ }
}
+
+ pthread_spin_unlock(&stack->event_lock);
+
+ __sync_fetch_and_add(&stack->stats.app_events, event_num - start_event_num);
}
return event_num;
}
-
-static bool remove_event(enum POLL_TYPE etype, struct lwip_sock **sock_list, int32_t event_num, struct lwip_sock *sock,
- struct lwip_sock *attach_sock)
+#else
+static int32_t epoll_lwip_event(struct wakeup_poll *wakeup, struct epoll_event *events, uint32_t maxevents)
{
- /* remove duplicate event */
- for (uint32_t i = 0; i < event_num && etype == TYPE_EPOLL; i++) {
- if (sock_list[i] == sock) {
- return true;
+ int32_t event_num = 0;
+ struct list_node *node, *temp;
+ list_for_each_safe(node, temp, &wakeup->event_list) {
+ struct lwip_sock *sock = container_of(node, struct lwip_sock, event_list);
+ if (sock->conn == NULL) {
+ list_del_node_init(&sock->event_list);
+ continue;
}
+
+ struct lwip_sock *temp_sock = sock;
+ do {
+ struct lwip_sock *attach_sock = (temp_sock->attach_fd > 0) ? get_socket(temp_sock->attach_fd) : temp_sock;
+ if (attach_sock == NULL || temp_sock->wait_close) {
+ temp_sock = (temp_sock->nextfd > 0) ? get_socket(temp_sock->nextfd) : NULL;
+ continue;
+ }
+
+ uint32_t event = update_events(attach_sock);
+ if (event != 0) {
+ events[event_num].events = event;
+ events[event_num].data = temp_sock->ep_data;
+ event_num++;
+ if (event_num >= maxevents) {
+ break;
+ }
+ }
+
+ temp_sock = (temp_sock->nextfd > 0) ? get_socket(temp_sock->nextfd) : NULL;
+ } while (temp_sock);
}
- return !check_event_vaild(attach_sock, attach_sock->events);
+ return event_num;
}
+#endif
-static int32_t get_lwip_events(struct weakup_poll *weakup, void *out, uint32_t maxevents, enum POLL_TYPE etype)
+static int32_t poll_lwip_event(struct pollfd *fds, nfds_t nfds)
{
- struct epoll_event *events = (struct epoll_event *)out;
- struct pollfd *fds = (struct pollfd *)out;
-
- if (etype == TYPE_EPOLL) {
- maxevents = LWIP_MIN(EPOLL_MAX_EVENTS, maxevents);
- }
int32_t event_num = 0;
- struct lwip_sock *sock = NULL;
- while (event_num < maxevents) {
- if (rte_ring_sc_dequeue(weakup->self_ring, (void **)&sock) &&
- rte_ring_sc_dequeue(weakup->event_ring, (void **)&sock)) {
- break;
- }
- __atomic_store_n(&sock->have_event, false, __ATOMIC_RELEASE);
+ for (uint32_t i = 0; i < nfds; i++) {
+ /* listenfd nextfd pointerto next stack listen, others nextfd=-1 */
+ int32_t fd = fds[i].fd;
+ while (fd > 0) {
+ struct lwip_sock *sock = get_socket(fd);
+ if (sock == NULL) {
+ break;
+ }
- /* sock->stack == NULL mean close sock */
- if (sock->stack == NULL) {
- continue;
- }
+ /* attach listen is empty, all event in attached listen. attached listen attach_fd is self */
+ struct lwip_sock *attach_sock = (sock->attach_fd > 0) ? get_socket(sock->attach_fd) : sock;
+ if (attach_sock == NULL || sock->wait_close) {
+ fd = sock->nextfd;
+ continue;
+ }
- /* attach listen is empty, all event in attached listen. attached listen attach_fd is self */
- struct lwip_sock *attach_sock = (sock->attach_fd > 0) ? get_socket(sock->attach_fd) : sock;
- if (attach_sock == NULL) {
- continue;
- }
+ uint32_t events = update_events(attach_sock);
+ if (events) {
+ fds[i].revents = events;
+ __sync_fetch_and_add(&sock->stack->stats.app_events, 1);
+ event_num++;
+ break;
+ }
- if (remove_event(etype, weakup->sock_list, event_num, sock, attach_sock)) {
- sock->stack->stats.remove_event++;
- continue;
+ fd = sock->nextfd;
}
+ }
- if (etype == TYPE_EPOLL) {
- events[event_num].events = attach_sock->events;
- events[event_num].data = sock->ep_data;
- weakup->sock_list[event_num] = sock;
- event_num++;
- } else {
- /* shadow_fd event notice listen_fd */
- if (attach_sock->shadowed_sock) {
- attach_sock = attach_sock->shadowed_sock;
- }
+ return event_num;
+}
- if (sock->conn) {
- event_num += save_poll_event(fds, maxevents, sock->conn->socket, attach_sock->events);
- }
+static inline bool have_kernel_fd(int32_t epfd, struct pollfd *fds, nfds_t nfds)
+{
+ /* when epfd > 0 is epoll type */
+ for (uint32_t i = 0; i < nfds && epfd < 0; i++) {
+ if (get_socket(fds[i].fd) == NULL) {
+ return true;
}
-
- sock->stack->stats.app_events++;
- sem_trywait(&weakup->event_sem); /* each event post sem, so every read down sem */
}
- return event_num;
+ return false;
}
-static inline int32_t remove_kernel_invaild_events(struct pollfd *fds, int32_t nfds, int32_t event_count)
+static inline int32_t poll_kernel_event(struct pollfd *fds, nfds_t nfds)
{
- int32_t real_count = 0;
+ int32_t event_num = 0;
- for (int i = 0; i < nfds && real_count < event_count; i++) {
- if (fds[i].fd < 0 || fds[i].revents == 0) {
+ for (uint32_t i = 0; i < nfds; i++) {
+ /* lwip event */
+ if (get_socket(fds[i].fd) != NULL || fds[i].fd < 0) {
continue;
}
- struct lwip_sock *sock = get_socket(fds[i].fd);
- if (sock && CONN_TYPE_IS_LIBOS(sock->conn)) {
- fds[i].revents = 0;
+ int32_t ret = posix_api->poll_fn(&fds[i], 1, 0);
+ if (ret < 0) {
+ if (errno != EINTR) {
+ return ret;
+ }
} else {
- real_count++;
+ event_num += ret;
}
}
- return real_count;
+ return event_num;
}
-static int32_t poll_event(struct weakup_poll *weakup, int32_t epfd, void *out, int32_t maxevents, int32_t timeout)
+static int32_t get_event(struct wakeup_poll *wakeup, int32_t epfd, void *out, int32_t maxevents, int32_t timeout)
{
- struct epoll_event *events = (struct epoll_event *)out;
struct pollfd *fds = (struct pollfd *)out;
+ struct epoll_event *events = (struct epoll_event *)out;
+ bool have_kernel = have_kernel_fd(epfd, fds, maxevents);
int32_t event_num = 0;
- int32_t event_kernel_num = 0;
- struct timespec epoll_interval = {
- .tv_sec = 0,
- .tv_nsec = EPOLL_INTERVAL_10MS,
- };
- uint32_t start_time = sys_now();
+ int32_t poll_time = 0;
+ int32_t ret;
+ /* when epfd > 0 is epoll type */
do {
- /* epoll_wait type */
- if (epfd > 0) {
- event_num += get_lwip_events(weakup, &events[event_num], maxevents - event_num, TYPE_EPOLL);
- if (event_num >= maxevents) {
- break;
- }
+ event_num += (epfd > 0) ? epoll_lwip_event(wakeup, &events[event_num], maxevents - event_num) :
+ poll_lwip_event(fds, maxevents);
- event_kernel_num = posix_api->epoll_wait_fn(epfd, &events[event_num], maxevents - event_num, 0);
+ if (have_kernel) {
+ int32_t event_kernel_num = (epfd > 0) ?
+ posix_api->epoll_wait_fn(epfd, &events[event_num], maxevents - event_num, 0) :
+ poll_kernel_event(fds, maxevents);
if (event_kernel_num < 0) {
- break;
+ return event_kernel_num;
}
event_num += event_kernel_num;
- } else {
- /* for poll events, we need to distiguish kernel events and gazelle events */
- event_kernel_num = posix_api->poll_fn(fds, maxevents, 0);
- if (event_kernel_num < 0) {
+ if (timeout >= 0 && poll_time >= timeout) {
break;
}
- event_kernel_num = remove_kernel_invaild_events(fds, maxevents, event_kernel_num);
- event_num += event_kernel_num;
-
- event_num += get_lwip_events(weakup, fds, maxevents, TYPE_POLL);
+ poll_time += EPOLL_KERNEL_INTERVAL;
}
if (event_num > 0) {
break;
}
- sem_timedwait(&weakup->event_sem, &epoll_interval);
- if (timeout > 0) {
- timeout = update_timeout(timeout, start_time);
+ int32_t interval = (have_kernel) ? EPOLL_KERNEL_INTERVAL : timeout;
+ struct timespec epoll_interval;
+ clock_gettime(CLOCK_REALTIME, &epoll_interval);
+ epoll_interval.tv_sec += interval / 1000;
+ epoll_interval.tv_nsec += (interval % 1000) * 1000000;
+ epoll_interval.tv_sec += epoll_interval.tv_nsec / 1000000000;
+ epoll_interval.tv_nsec = epoll_interval.tv_nsec % 1000000000;
+
+ if (timeout < 0 && !have_kernel) {
+ ret = sem_wait(&wakeup->event_sem);
+ } else {
+ ret = sem_timedwait(&wakeup->event_sem, &epoll_interval);
+ }
+
+ if (!have_kernel && ret < 0) {
+ break;
}
- } while (timeout != 0);
+ } while (event_num <= maxevents);
- return (event_kernel_num < 0) ? event_kernel_num : event_num;
+ return event_num;
}
-static int32_t poll_init(struct pollfd *fds, nfds_t nfds, struct weakup_poll *weakup)
+int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event* events, int32_t maxevents, int32_t timeout)
{
- int32_t stack_id = 0;
- int32_t stack_count[PROTOCOL_STACK_MAX] = {0};
+ /* avoid the starvation of epoll events from both netstack */
+ maxevents = LWIP_MIN(LWIP_EPOOL_MAX_EVENTS, maxevents);
- if (weakup->event_ring == NULL) {
- weakup->event_ring = create_ring("POLL_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, rte_gettid());
- if (weakup->event_ring == NULL) {
- GAZELLE_RETURN(ENOMEM);
- }
+ struct lwip_sock *sock = get_socket_by_fd(epfd);
+ if (sock == NULL) {
+ GAZELLE_RETURN(EINVAL);
+ }
- weakup->self_ring = create_ring("SELF_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, rte_gettid());
- if (weakup->self_ring == NULL) {
- GAZELLE_RETURN(ENOMEM);
- }
+ if (sock->wakeup == NULL) {
+ return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout);
+ }
+
+ return get_event(sock->wakeup, epfd, events, maxevents, timeout);
+}
+
+static void poll_init(struct pollfd *fds, nfds_t nfds, struct wakeup_poll *wakeup)
+{
+ int32_t stack_count[PROTOCOL_STACK_MAX] = {0};
+
+ if (!wakeup->init) {
+ wakeup->init = true;
+ sem_init(&wakeup->event_sem, 0, 0);
+ } else {
+ while (sem_trywait(&wakeup->event_sem) == 0) {}
}
for (uint32_t i = 0; i < nfds; i++) {
@@ -432,51 +473,33 @@ static int32_t poll_init(struct pollfd *fds, nfds_t nfds, struct weakup_poll *we
break;
}
sock->epoll_events = fds[i].events | POLLERR;
- sock->weakup = weakup;
-
- raise_pending_events(sock);
-
- stack_count[sock->stack->queue_id]++;
+ sock->wakeup = wakeup;
/* listenfd list */
fd = sock->nextfd;
+ stack_count[sock->stack->queue_id]++;
} while (fd > 0);
}
- for (uint32_t i = 0; i < get_protocol_stack_group()->stack_num; i++) {
- if (stack_count[i] > stack_count[stack_id]) {
- stack_id = i;
- }
- }
-
- bind_to_stack_numa(stack_id);
-
- return 0;
-}
-
-int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event* events, int32_t maxevents, int32_t timeout)
-{
- /* avoid the starvation of epoll events from both netstack */
- maxevents = LWIP_MIN(LWIP_EPOOL_MAX_EVENTS, maxevents);
-
- struct lwip_sock *sock = get_socket_by_fd(epfd);
- if (sock == NULL) {
- GAZELLE_RETURN(EINVAL);
+ if (wakeup->bind_stack) {
+ return;
}
- if (sock->weakup == NULL) {
- return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout);
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ uint32_t bind_id = 0;
+ for (uint32_t i = 0; i < stack_group->stack_num; i++) {
+ if (stack_count[i] > stack_count[bind_id]) {
+ bind_id = i;
+ }
}
- return poll_event(sock->weakup, epfd, events, maxevents, timeout);
+ bind_to_stack_numa(stack_group->stacks[bind_id]);
+ wakeup->bind_stack = stack_group->stacks[bind_id];
}
int32_t lstack_poll(struct pollfd *fds, nfds_t nfds, int32_t timeout)
{
- int32_t ret = poll_init(fds, nfds, &g_weakup_poll);
- if (ret != 0) {
- return -1;
- }
+ poll_init(fds, nfds, &g_wakeup_poll);
- return poll_event(&g_weakup_poll, -1, fds, nfds, timeout);
+ return get_event(&g_wakeup_poll, -1, fds, nfds, timeout);
}
diff --git a/src/lstack/api/lstack_signal.c b/src/lstack/api/lstack_signal.c
index 5e4af56..f4763e8 100644
--- a/src/lstack/api/lstack_signal.c
+++ b/src/lstack/api/lstack_signal.c
@@ -16,6 +16,7 @@
#include <execinfo.h>
#include <unistd.h>
#include <lwip/lwipsock.h>
+#include <lwip/posix_api.h>
#include "lstack_log.h"
diff --git a/src/lstack/core/lstack_cfg.c b/src/lstack/core/lstack_cfg.c
index 53712a8..13086a3 100644
--- a/src/lstack/core/lstack_cfg.c
+++ b/src/lstack/core/lstack_cfg.c
@@ -33,14 +33,14 @@
#include "gazelle_reg_msg.h"
#include "lstack_log.h"
#include "gazelle_base_func.h"
-#include "gazelle_parse_config.h"
#include "lstack_protocol_stack.h"
+#include "gazelle_parse_config.h"
#define DEFAULT_CONF_FILE "/etc/gazelle/lstack.conf"
#define LSTACK_CONF_ENV "LSTACK_CONF_PATH"
#define NUMA_CPULIST_PATH "/sys/devices/system/node/node%u/cpulist"
#define DEV_MAC_LEN 17
-#define CPUS_RANGE_NUM 32
+#define CPUS_MAX_NUM 256
static struct cfg_params g_config_params;
@@ -50,7 +50,7 @@ static int32_t parse_host_addr(void);
static int32_t parse_low_power_mode(void);
static int32_t parse_stack_cpu_number(void);
static int32_t parse_use_ltran(void);
-static int32_t parse_weakup_cpu_number(void);
+static int32_t parse_wakeup_cpu_number(void);
static int32_t parse_mask_addr(void);
static int32_t parse_devices(void);
static int32_t parse_dpdk_args(void);
@@ -70,7 +70,7 @@ static struct config_vector_t g_config_tbl[] = {
{ "devices", parse_devices },
{ "dpdk_args", parse_dpdk_args },
{ "num_cpus", parse_stack_cpu_number },
- { "num_wakeup", parse_weakup_cpu_number },
+ { "num_wakeup", parse_wakeup_cpu_number },
{ "low_power_mode", parse_low_power_mode },
{ "kni_switch", parse_kni_switch },
{ NULL, NULL }
@@ -240,7 +240,6 @@ static int32_t parse_stack_cpu_number(void)
}
g_config_params.num_cpu = cnt;
- get_protocol_stack_group()->stack_num = g_config_params.num_cpu;
return 0;
}
@@ -275,10 +274,10 @@ static int32_t numa_to_cpusnum(unsigned socket_id, uint32_t *cpulist, int32_t nu
static int32_t stack_idle_cpuset(struct protocol_stack *stack, cpu_set_t *exclude)
{
- uint32_t cpulist[CPUS_RANGE_NUM];
+ uint32_t cpulist[CPUS_MAX_NUM];
- int32_t cpunum = numa_to_cpusnum(stack->socket_id, cpulist, CPUS_RANGE_NUM);
- if (cpunum <= 0 ) {
+ int32_t cpunum = numa_to_cpusnum(stack->socket_id, cpulist, CPUS_MAX_NUM);
+ if (cpunum <= 0) {
LSTACK_LOG(ERR, LSTACK, "numa_to_cpusnum failed\n");
return -1;
}
@@ -308,7 +307,7 @@ int32_t init_stack_numa_cpuset(void)
CPU_SET(cfg->cpus[idx], &stack_cpuset);
}
for (int32_t idx = 0; idx < cfg->num_wakeup; ++idx) {
- CPU_SET(cfg->weakup[idx], &stack_cpuset);
+ CPU_SET(cfg->wakeup[idx], &stack_cpuset);
}
for (int32_t idx = 0; idx < stack_group->stack_num; ++idx) {
@@ -621,7 +620,7 @@ static int32_t parse_low_power_mode(void)
return 0;
}
-static int32_t parse_weakup_cpu_number(void)
+static int32_t parse_wakeup_cpu_number(void)
{
const config_setting_t *cfg_args = NULL;
const char *args = NULL;
@@ -639,13 +638,19 @@ static int32_t parse_weakup_cpu_number(void)
}
char *tmp_arg = strdup(args);
- int32_t cnt = separate_str_to_array(tmp_arg, g_config_params.weakup, CFG_MAX_CPUS);
+ int32_t cnt = separate_str_to_array(tmp_arg, g_config_params.wakeup, CFG_MAX_CPUS);
free(tmp_arg);
if (cnt <= 0 || cnt > CFG_MAX_CPUS) {
return -EINVAL;
}
g_config_params.num_wakeup = cnt;
+ if (g_config_params.num_wakeup < g_config_params.num_cpu) {
+ LSTACK_PRE_LOG(LSTACK_ERR, "num_wakeup=%d less than num_stack_cpu=%d.\n", g_config_params.num_wakeup,
+ g_config_params.num_cpu);
+ return -EINVAL;
+ }
+
return 0;
}
diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c
index 430c6e5..3f446ea 100644
--- a/src/lstack/core/lstack_dpdk.c
+++ b/src/lstack/core/lstack_dpdk.c
@@ -28,6 +28,7 @@
#include <rte_errno.h>
#include <rte_kni.h>
#include <lwip/posix_api.h>
+#include <lwipopts.h>
#include "lstack_log.h"
#include "dpdk_common.h"
@@ -109,35 +110,39 @@ static struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_
char pool_name[PATH_MAX];
struct rte_mempool *pool;
- ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%d", name, queue_id);
+ ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%hu", name, queue_id);
if (ret < 0) {
return NULL;
}
/* time stamp before pbuf_custom as priv_data */
+ pthread_mutex_lock(get_mem_mutex());
pool = rte_pktmbuf_pool_create(pool_name, nb_mbuf, mbuf_cache_size,
sizeof(struct pbuf_custom) + GAZELLE_MBUFF_PRIV_SIZE, MBUF_SZ, rte_socket_id());
if (pool == NULL) {
LSTACK_LOG(ERR, LSTACK, "cannot create %s pool rte_err=%d\n", pool_name, rte_errno);
}
+ pthread_mutex_unlock(get_mem_mutex());
return pool;
}
-static struct rte_mempool *create_rpc_mempool(const char *name, uint16_t queue_id)
+struct rte_mempool *create_rpc_mempool(const char *name, uint16_t queue_id)
{
char pool_name[PATH_MAX];
struct rte_mempool *pool;
int32_t ret;
- ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%d", name, queue_id);
+ ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%hu", name, queue_id);
if (ret < 0) {
return NULL;
}
+ pthread_mutex_lock(get_mem_mutex());
pool = rte_mempool_create(pool_name, CALL_POOL_SZ, sizeof(struct rpc_msg), 0, 0, NULL, NULL, NULL,
NULL, rte_socket_id(), 0);
if (pool == NULL) {
LSTACK_LOG(ERR, LSTACK, "cannot create %s pool rte_err=%d\n", pool_name, rte_errno);
}
+ pthread_mutex_unlock(get_mem_mutex());
return pool;
}
@@ -147,7 +152,7 @@ static struct reg_ring_msg *create_reg_mempool(const char *name, uint16_t queue_
char pool_name[PATH_MAX];
struct reg_ring_msg *reg_buf;
- ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%d", name, queue_id);
+ ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%hu", name, queue_id);
if (ret < 0) {
return NULL;
}
@@ -167,21 +172,18 @@ int32_t pktmbuf_pool_init(struct protocol_stack *stack, uint16_t stack_num)
return -1;
}
- stack->rx_pktmbuf_pool = create_pktmbuf_mempool("rx_mbuf", RX_NB_MBUF / stack_num, 0, stack->queue_id);
+ stack->rx_pktmbuf_pool = create_pktmbuf_mempool("rx_mbuf", RX_NB_MBUF / stack_num, RX_MBUF_CACHE_SZ,
+ stack->queue_id);
if (stack->rx_pktmbuf_pool == NULL) {
return -1;
}
- stack->tx_pktmbuf_pool = create_pktmbuf_mempool("tx_mbuf", TX_NB_MBUF / stack_num, 0, stack->queue_id);
+ stack->tx_pktmbuf_pool = create_pktmbuf_mempool("tx_mbuf", TX_NB_MBUF / stack_num, TX_MBUF_CACHE_SZ,
+ stack->queue_id);
if (stack->tx_pktmbuf_pool == NULL) {
return -1;
}
- stack->rpc_pool = create_rpc_mempool("rpc_msg", stack->queue_id);
- if (stack->rpc_pool == NULL) {
- return -1;
- }
-
if (use_ltran()) {
stack->reg_buf = create_reg_mempool("reg_ring_msg", stack->queue_id);
if (stack->reg_buf == NULL) {
@@ -214,16 +216,12 @@ int32_t create_shared_ring(struct protocol_stack *stack)
{
lockless_queue_init(&stack->rpc_queue);
- stack->weakup_ring = create_ring("SHARED_WEAKUP_RING", VDEV_WEAKUP_QUEUE_SZ, 0, stack->queue_id);
- if (stack->weakup_ring == NULL) {
- return -1;
- }
-
- stack->send_idle_ring = create_ring("SEND_IDLE_RING", VDEV_IDLE_QUEUE_SZ, 0, stack->queue_id);
- if (stack->send_idle_ring == NULL) {
- return -1;
+ if (get_protocol_stack_group()->wakeup_enable) {
+ stack->wakeup_ring = create_ring("WAKEUP_RING", VDEV_WAKEUP_QUEUE_SZ, 0, stack->queue_id);
+ if (stack->wakeup_ring == NULL) {
+ return -1;
+ }
}
- stack->in_replenish = 0;
if (use_ltran()) {
stack->rx_ring = create_ring("RING_RX", VDEV_RX_QUEUE_SZ, RING_F_SP_ENQ | RING_F_SC_DEQ, stack->queue_id);
@@ -328,8 +326,19 @@ static struct eth_params *alloc_eth_params(uint16_t port_id, uint16_t nb_queues)
return eth_params;
}
+uint64_t get_eth_params_rx_ol(void)
+{
+ return use_ltran() ? 0 : get_protocol_stack_group()->eth_params->conf.rxmode.offloads;
+}
+
+uint64_t get_eth_params_tx_ol(void)
+{
+ return use_ltran() ? 0 : get_protocol_stack_group()->eth_params->conf.txmode.offloads;
+}
+
static int eth_params_checksum(struct rte_eth_conf *conf, struct rte_eth_dev_info *dev_info)
{
+#if CHECKSUM_OFFLOAD_ALL
uint64_t rx_ol = 0;
uint64_t tx_ol = 0;
@@ -337,43 +346,48 @@ static int eth_params_checksum(struct rte_eth_conf *conf, struct rte_eth_dev_inf
uint64_t tx_ol_capa = dev_info->tx_offload_capa;
// rx ip
- if (rx_ol_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) {
#if CHECKSUM_CHECK_IP_HW
+ if (rx_ol_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) {
rx_ol |= DEV_RX_OFFLOAD_IPV4_CKSUM;
LSTACK_LOG(INFO, LSTACK, "DEV_RX_OFFLOAD_IPV4_CKSUM\n");
-#endif
}
+#endif
// rx tcp
- if (rx_ol_capa & DEV_RX_OFFLOAD_TCP_CKSUM) {
#if CHECKSUM_CHECK_TCP_HW
+ if (rx_ol_capa & DEV_RX_OFFLOAD_TCP_CKSUM) {
rx_ol |= DEV_RX_OFFLOAD_TCP_CKSUM;
LSTACK_LOG(INFO, LSTACK, "DEV_RX_OFFLOAD_TCP_CKSUM\n");
-#endif
}
+#endif
// tx ip
- if (tx_ol_capa & DEV_TX_OFFLOAD_IPV4_CKSUM) {
#if CHECKSUM_GEN_IP_HW
+ if (tx_ol_capa & DEV_TX_OFFLOAD_IPV4_CKSUM) {
tx_ol |= DEV_TX_OFFLOAD_IPV4_CKSUM;
LSTACK_LOG(INFO, LSTACK, "DEV_TX_OFFLOAD_IPV4_CKSUM\n");
-#endif
}
+#endif
// tx tcp
- if (tx_ol_capa & DEV_TX_OFFLOAD_TCP_CKSUM) {
#if CHECKSUM_GEN_TCP_HW
+ if (tx_ol_capa & DEV_TX_OFFLOAD_TCP_CKSUM) {
tx_ol |= DEV_TX_OFFLOAD_TCP_CKSUM;
LSTACK_LOG(INFO, LSTACK, "DEV_TX_OFFLOAD_TCP_CKSUM\n");
+ }
#endif
+ if (!(rx_ol & DEV_RX_OFFLOAD_TCP_CKSUM) || !(rx_ol & DEV_RX_OFFLOAD_IPV4_CKSUM)) {
+ rx_ol = 0;
+ }
+ if (!(tx_ol & DEV_TX_OFFLOAD_TCP_CKSUM) || !(tx_ol & DEV_TX_OFFLOAD_IPV4_CKSUM)) {
+ tx_ol = 0;
}
conf->rxmode.offloads = rx_ol;
conf->txmode.offloads = tx_ol;
-#if CHECKSUM_CHECK_IP_HW || CHECKSUM_CHECK_TCP_HW || CHECKSUM_GEN_IP_HW || CHECKSUM_GEN_TCP_HW
LSTACK_LOG(INFO, LSTACK, "set checksum offloads\n");
-#endif
+#endif /* CHECKSUM_OFFLOAD_ALL */
return 0;
}
@@ -580,3 +594,30 @@ void dpdk_skip_nic_init(void)
}
}
+int32_t init_dpdk_ethdev(void)
+{
+ int32_t ret;
+
+ ret = dpdk_ethdev_init();
+ if (ret != 0) {
+ LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_init failed\n");
+ return -1;
+ }
+
+ ret = dpdk_ethdev_start();
+ if (ret < 0) {
+ LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_start failed\n");
+ return -1;
+ }
+
+ if (get_global_cfg_params()->kni_switch) {
+ ret = dpdk_init_lstack_kni();
+ if (ret < 0) {
+ return -1;
+ }
+ }
+
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ sem_post(&stack_group->ethdev_init);
+ return 0;
+}
diff --git a/src/lstack/core/lstack_init.c b/src/lstack/core/lstack_init.c
index 17195c8..774d0f3 100644
--- a/src/lstack/core/lstack_init.c
+++ b/src/lstack/core/lstack_init.c
@@ -30,6 +30,7 @@
#include <lwip/tcpip.h>
#include <lwip/memp_def.h>
#include <lwip/lwipopts.h>
+#include <lwip/posix_api.h>
#include "lstack_cfg.h"
#include "lstack_control_plane.h"
@@ -225,16 +226,18 @@ __attribute__((constructor)) void gazelle_network_init(void)
lstack_log_level_init();
- /*
- * Phase 8: memory and nic */
ret = init_protocol_stack();
if (ret != 0) {
LSTACK_EXIT(1, "init_protocol_stack failed\n");
}
- ret = create_stack_thread();
- if (ret != 0) {
- LSTACK_EXIT(1, "create_stack_thread failed\n");
+ /*
+ * Phase 8: nic */
+ if (!use_ltran()) {
+ ret = init_dpdk_ethdev();
+ if (ret != 0) {
+ LSTACK_EXIT(1, "init_dpdk_ethdev failed\n");
+ }
}
/*
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index b4d75d2..887464d 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -17,6 +17,7 @@
#include <arch/sys_arch.h>
#include <lwip/pbuf.h>
#include <lwip/priv/tcp_priv.h>
+#include <lwip/posix_api.h>
#include <securec.h>
#include <rte_errno.h>
#include <rte_malloc.h>
@@ -25,7 +26,6 @@
#include "lstack_ethdev.h"
#include "lstack_protocol_stack.h"
#include "lstack_log.h"
-#include "lstack_weakup.h"
#include "lstack_dpdk.h"
#include "lstack_stack_stat.h"
#include "lstack_lwip.h"
@@ -49,37 +49,82 @@ void listen_list_add_node(int32_t head_fd, int32_t add_fd)
sock->nextfd = add_fd;
}
+static void free_ring_pbuf(struct rte_ring *ring)
+{
+ while (1) {
+ struct pbuf *pbuf = NULL;
+ int32_t ret = rte_ring_sc_dequeue(ring, (void **)&pbuf);
+ if (ret != 0) {
+ break;
+ }
+
+ pbuf_free(pbuf);
+ }
+}
+
static void reset_sock_data(struct lwip_sock *sock)
{
/* check null pointer in ring_free func */
if (sock->recv_ring) {
+ free_ring_pbuf(sock->recv_ring);
rte_ring_free(sock->recv_ring);
}
sock->recv_ring = NULL;
+ if (sock->recv_wait_free) {
+ free_ring_pbuf(sock->recv_wait_free);
+ rte_ring_free(sock->recv_wait_free);
+ }
+ sock->recv_wait_free = NULL;
+
if (sock->send_ring) {
+ free_ring_pbuf(sock->send_ring);
rte_ring_free(sock->send_ring);
}
sock->send_ring = NULL;
+ if (sock->send_idle_ring) {
+ free_ring_pbuf(sock->send_idle_ring);
+ rte_ring_free(sock->send_idle_ring);
+ }
+ sock->send_idle_ring = NULL;
+
sock->stack = NULL;
- sock->weakup = NULL;
+ sock->wakeup = NULL;
sock->events = 0;
sock->nextfd = -1;
sock->attach_fd = -1;
sock->wait_close = false;
- sock->have_event = false;
- sock->have_rpc_send = false;
sock->shadowed_sock = NULL;
+ sock->epoll_events = 0;
+ sock->events = 0;
if (sock->recv_lastdata) {
pbuf_free(sock->recv_lastdata);
- sock->recv_lastdata = NULL;
}
+ sock->recv_lastdata = NULL;
if (sock->send_lastdata) {
pbuf_free(sock->send_lastdata);
- sock->send_lastdata = NULL;
+ }
+ sock->send_lastdata = NULL;
+}
+
+static void replenish_send_idlembuf(struct rte_ring *ring)
+{
+ uint32_t replenish_cnt = rte_ring_free_count(ring);
+
+ for (uint32_t i = 0; i < replenish_cnt; i++) {
+ struct pbuf *pbuf = lwip_alloc_pbuf(PBUF_TRANSPORT, TCP_MSS, PBUF_RAM);
+ if (pbuf == NULL) {
+ break;
+ }
+
+ int32_t ret = rte_ring_sp_enqueue(ring, (void *)pbuf);
+ if (ret < 0) {
+ pbuf_free(pbuf);
+ break;
+ }
}
}
@@ -99,19 +144,31 @@ void gazelle_init_sock(int32_t fd)
return;
}
+ sock->recv_wait_free = create_ring("wait_free", SOCK_RECV_RING_SIZE, 0, name_tick++);
+ if (sock->recv_wait_free == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "wait_free create failed. errno: %d.\n", rte_errno);
+ return;
+ }
+
sock->send_ring = create_ring("sock_send", SOCK_SEND_RING_SIZE, 0, name_tick++);
if (sock->send_ring == NULL) {
LSTACK_LOG(ERR, LSTACK, "sock_send create failed. errno: %d.\n", rte_errno);
return;
}
+ sock->send_idle_ring = create_ring("idle_send", SOCK_SEND_RING_SIZE, 0, name_tick++);
+ if (sock->send_idle_ring == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "idle_send create failed. errno: %d.\n", rte_errno);
+ return;
+ }
+ replenish_send_idlembuf(sock->send_idle_ring);
+
sock->stack = get_protocol_stack();
sock->stack->conn_num++;
init_list_node(&sock->recv_list);
init_list_node(&sock->attach_list);
init_list_node(&sock->listen_list);
init_list_node(&sock->event_list);
- init_list_node(&sock->wakeup_list);
init_list_node(&sock->send_list);
}
@@ -129,7 +186,9 @@ void gazelle_clean_sock(int32_t fd)
list_del_node_init(&sock->recv_list);
list_del_node_init(&sock->attach_list);
list_del_node_init(&sock->listen_list);
+#ifdef GAZELLE_USE_EPOLL_EVENT_STACK
list_del_node_init(&sock->event_list);
+#endif
list_del_node_init(&sock->send_list);
}
@@ -206,101 +265,60 @@ struct pbuf *lwip_alloc_pbuf(pbuf_layer layer, uint16_t length, pbuf_type type)
void *data = rte_pktmbuf_mtod(mbuf, void *);
struct pbuf *pbuf = pbuf_alloced_custom(layer, length, type, pbuf_custom, data, MAX_PACKET_SZ);
- return pbuf;
-}
-
-void stack_replenish_send_idlembuf(struct protocol_stack *stack)
-{
- uint32_t replenish_cnt = rte_ring_free_count(stack->send_idle_ring);
-
- for (uint32_t i = 0; i < replenish_cnt; i++) {
- struct pbuf *pbuf = lwip_alloc_pbuf(PBUF_TRANSPORT, TCP_MSS, PBUF_RAM);
- if (pbuf == NULL) {
- break;
- }
-
- int32_t ret = rte_ring_sp_enqueue(stack->send_idle_ring, (void *)pbuf);
- if (ret < 0) {
- gazelle_free_pbuf(pbuf);
- break;
- }
+#if CHECKSUM_CHECK_IP_HW || CHECKSUM_CHECK_TCP_HW
+ if (pbuf) {
+ pbuf->ol_flags = 0;
+ pbuf->l2_len = 0;
+ pbuf->l3_len = 0;
}
+#endif
+
+ return pbuf;
}
-ssize_t write_lwip_data(struct lwip_sock *sock, int32_t fd, int32_t flags)
+struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size, uint8_t *apiflags)
{
struct pbuf *pbuf = NULL;
- ssize_t send_ret = 0;
- ssize_t send_len = 0;
- do {
- if (sock->send_lastdata) {
- pbuf = sock->send_lastdata;
- sock->send_lastdata = NULL;
- } else {
- int32_t ret = rte_ring_sc_dequeue(sock->send_ring, (void **)&pbuf);
- if (ret != 0) {
- break;
- }
- }
-
- if (sock->conn == NULL || sock->conn->pcb.tcp == NULL) {
- GAZELLE_RETURN(ENOENT);
- }
-
- uint16_t available = tcp_sndbuf(sock->conn->pcb.tcp);
- if (available < pbuf->tot_len) {
- sock->send_lastdata = pbuf;
- break;
- }
-
- ssize_t pbuf_len = pbuf->tot_len;
- send_ret = lwip_send(fd, pbuf, pbuf->tot_len, flags);
- if (send_ret > 0) {
- send_len += send_ret;
- }
- if (send_ret != pbuf_len) {
- sock->stack->stats.write_lwip_drop++;
- break;
+ if (sock->send_lastdata) {
+ pbuf = sock->send_lastdata;
+ sock->send_lastdata = NULL;
+ } else {
+ int32_t ret = rte_ring_sc_dequeue(sock->send_ring, (void **)&pbuf);
+ if (ret != 0) {
+ *apiflags &= ~TCP_WRITE_FLAG_MORE;
+ return NULL;
}
+ }
- sock->stack->stats.write_lwip_cnt++;
- } while (1);
-
- return (send_ret < 0) ? send_ret : send_len;
-}
-
-void add_self_event(struct lwip_sock *sock, uint32_t events)
-{
- struct weakup_poll *wakeup = sock->weakup;
- struct protocol_stack *stack = sock->stack;
- if (wakeup == NULL || stack == NULL) {
- return;
+ if (pbuf->tot_len >= remain_size) {
+ sock->send_lastdata = pbuf;
+ *apiflags |= TCP_WRITE_FLAG_MORE; /* set TCP_PSH flag */
+ return NULL;
}
- sock->events |= events;
+ replenish_send_idlembuf(sock->send_idle_ring);
- if (__atomic_load_n(&sock->have_event, __ATOMIC_ACQUIRE)) {
- return;
+ if ((sock->epoll_events & EPOLLOUT) && rte_ring_free_count(sock->send_ring)) {
+ add_epoll_event(sock->conn, EPOLLOUT);
}
- if (rte_ring_mp_enqueue(wakeup->self_ring, (void *)sock) == 0) {
- __atomic_store_n(&sock->have_event, true, __ATOMIC_RELEASE);
- sem_post(&sock->weakup->event_sem);
- stack->stats.epoll_self_event++;
- } else {
- rpc_call_addevent(stack, sock);
- stack->stats.epoll_self_call++;
- }
+ sock->stack->stats.write_lwip_cnt++;
+ return pbuf;
}
ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len)
{
+ if (sock->events & EPOLLERR) {
+ return 0;
+ }
+
uint32_t free_count = rte_ring_free_count(sock->send_ring);
if (free_count == 0) {
- GAZELLE_RETURN(EAGAIN);
+ return -1;
}
- uint32_t avaible_cont = rte_ring_count(sock->stack->send_idle_ring);
+
+ uint32_t avaible_cont = rte_ring_count(sock->send_idle_ring);
avaible_cont = LWIP_MIN(free_count, avaible_cont);
struct pbuf *pbuf = NULL;
@@ -309,7 +327,7 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len)
uint32_t send_pkt = 0;
while (send_len < len && send_pkt < avaible_cont) {
- int32_t ret = rte_ring_sc_dequeue(sock->stack->send_idle_ring, (void **)&pbuf);
+ int32_t ret = rte_ring_sc_dequeue(sock->send_idle_ring, (void **)&pbuf);
if (ret < 0) {
sock->stack->stats.app_write_idlefail++;
break;
@@ -322,28 +340,16 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len)
ret = rte_ring_sp_enqueue(sock->send_ring, pbuf);
if (ret != 0) {
sock->stack->stats.app_write_drop++;
- gazelle_free_pbuf(pbuf);
+ pbuf_free(pbuf);
break;
}
- sock->stack->stats.app_write_cnt++;
send_len += copy_len;
send_pkt++;
}
+ __sync_fetch_and_add(&sock->stack->stats.app_write_cnt, send_pkt);
- if ((sock->epoll_events & EPOLLOUT) && NETCONN_IS_DATAOUT(sock)) {
- add_self_event(sock, EPOLLOUT);
- sock->stack->stats.write_events++;
- } else {
- sock->events &= ~EPOLLOUT;
- }
-
- if (rte_ring_free_count(sock->stack->send_idle_ring) > USED_IDLE_WATERMARK && !sock->stack->in_replenish) {
- sock->stack->in_replenish = true;
- rpc_call_replenish_idlembuf(sock->stack);
- }
-
- return send_len;
+ return (send_len <= 0) ? -1 : send_len;
}
void stack_send(struct rpc_msg *msg)
@@ -351,27 +357,62 @@ void stack_send(struct rpc_msg *msg)
int32_t fd = msg->args[MSG_ARG_0].i;
int32_t flags = msg->args[MSG_ARG_2].i;
- struct protocol_stack *stack = get_protocol_stack();
struct lwip_sock *sock = get_socket(fd);
if (sock == NULL) {
msg->result = -1;
return;
}
- msg->result = write_lwip_data(sock, fd, flags);
- __atomic_store_n(&sock->have_rpc_send, false, __ATOMIC_RELEASE);
+ if (!NETCONN_IS_DATAOUT(sock)) {
+ return;
+ }
- if (msg->result >= 0 &&
- (rte_ring_count(sock->send_ring) || sock->send_lastdata)) {
+ /* send all send_ring, so len set lwip send max. */
+ ssize_t len = lwip_send(fd, sock, UINT16_MAX, flags);
+ if (len == 0) {
+ /* FIXME: should use POLLRDHUP, when connection be closed. lwip event-callback no POLLRDHUP */
+ add_epoll_event(sock->conn, EPOLLERR);
+ }
+
+ /* have remain data add sendlist */
+ if (NETCONN_IS_DATAOUT(sock)) {
if (list_is_empty(&sock->send_list)) {
- __atomic_store_n(&sock->have_rpc_send, true, __ATOMIC_RELEASE);
- list_add_node(&stack->send_list, &sock->send_list);
- sock->stack->stats.send_self_rpc++;
+ sock->send_flags = flags;
+ list_add_node(&sock->stack->send_list, &sock->send_list);
}
+ sock->stack->stats.send_self_rpc++;
}
+}
- if (rte_ring_free_count(sock->send_ring)) {
- add_epoll_event(sock->conn, EPOLLOUT);
+void send_stack_list(struct protocol_stack *stack, uint32_t send_max)
+{
+ struct list_node *node, *temp;
+ struct lwip_sock *sock;
+ uint32_t read_num = 0;
+
+ list_for_each_safe(node, temp, &stack->send_list) {
+ sock = container_of(node, struct lwip_sock, send_list);
+
+ if (sock->conn == NULL || !NETCONN_IS_DATAOUT(sock)) {
+ list_del_node_init(&sock->send_list);
+ continue;
+ }
+
+ /* send all send_ring, so len set lwip send max. */
+ ssize_t len = lwip_send(sock->conn->socket, sock, UINT16_MAX, sock->send_flags);
+ if (len == 0) {
+ /* FIXME: should use POLLRDHUP, when connection be closed. lwip event-callback no POLLRDHUP */
+ add_epoll_event(sock->conn, EPOLLERR);
+ list_del_node_init(&sock->send_list);
+ }
+
+ if (!NETCONN_IS_DATAOUT(sock)) {
+ list_del_node_init(&sock->send_list);
+ }
+
+ if (++read_num >= send_max) {
+ break;
+ }
}
}
@@ -381,6 +422,10 @@ ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags)
return 0;
}
+ if (rte_ring_count(sock->recv_wait_free)) {
+ free_ring_pbuf(sock->recv_wait_free);
+ }
+
uint32_t free_count = rte_ring_free_count(sock->recv_ring);
uint32_t data_count = rte_ring_count(sock->conn->recvmbox->ring);
uint32_t read_max = LWIP_MIN(free_count, data_count);
@@ -411,6 +456,10 @@ ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags)
read_count++;
}
+ if (get_protocol_stack_group()->latency_start) {
+ calculate_lstack_latency(&sock->stack->latency, pbuf, GAZELLE_LATENCY_LWIP);
+ }
+
recv_len += pbuf->len;
/* once we have some data to return, only add more if we don't need to wait */
@@ -425,6 +474,10 @@ ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags)
add_epoll_event(sock->conn, EPOLLIN);
}
sock->stack->stats.read_lwip_cnt += read_count;
+
+ if (recv_len == 0) {
+ GAZELLE_RETURN(EAGAIN);
+ }
return recv_len;
}
@@ -440,7 +493,7 @@ ssize_t recvmsg_from_stack(int32_t s, struct msghdr *message, int32_t flags)
if ((message->msg_iov[i].iov_base == NULL) || ((ssize_t)message->msg_iov[i].iov_len <= 0) ||
((size_t)(ssize_t)message->msg_iov[i].iov_len != message->msg_iov[i].iov_len) ||
((ssize_t)(buflen + (ssize_t)message->msg_iov[i].iov_len) <= 0)) {
- GAZELLE_RETURN(EINVAL);
+ GAZELLE_RETURN(EINVAL);
}
buflen = (ssize_t)(buflen + (ssize_t)message->msg_iov[i].iov_len);
}
@@ -479,16 +532,14 @@ ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags)
sock->send_flags = flags;
ssize_t send = write_stack_data(sock, buf, len);
-
- ssize_t ret = 0;
- if (!__atomic_load_n(&sock->have_rpc_send, __ATOMIC_ACQUIRE)) {
- __atomic_store_n(&sock->have_rpc_send, true, __ATOMIC_RELEASE);
- ret = rpc_call_send(fd, buf, len, flags);
- }
-
- if (send <= 0 || ret < 0) {
+ if (send < 0) {
GAZELLE_RETURN(EAGAIN);
+ } else if (send == 0) {
+ return 0;
}
+ rte_smp_mb();
+
+ rpc_call_send(fd, NULL, send, flags);
return send;
}
@@ -505,7 +556,7 @@ ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags)
if ((message->msg_iov[i].iov_base == NULL) || ((ssize_t)message->msg_iov[i].iov_len <= 0) ||
((size_t)(ssize_t)message->msg_iov[i].iov_len != message->msg_iov[i].iov_len) ||
((ssize_t)(buflen + (ssize_t)message->msg_iov[i].iov_len) <= 0)) {
- GAZELLE_RETURN(EINVAL);
+ GAZELLE_RETURN(EINVAL);
}
buflen = (ssize_t)(buflen + (ssize_t)message->msg_iov[i].iov_len);
}
@@ -513,7 +564,7 @@ ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags)
for (i = 0; i < message->msg_iovlen; i++) {
ret = gazelle_send(s, message->msg_iov[i].iov_base, message->msg_iov[i].iov_len, flags);
if (ret < 0) {
- return buflen == 0 ? ret : buflen;
+ return buflen == 0 ? ret : buflen;
}
buflen += ret;
}
@@ -536,6 +587,10 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags)
}
sock->recv_flags = flags;
+ if ((sock->events & EPOLLERR) && !NETCONN_IS_DATAIN(sock)) {
+ return 0;
+ }
+
while (recv_left > 0) {
if (sock->recv_lastdata) {
pbuf = sock->recv_lastdata;
@@ -556,22 +611,18 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags)
if (pbuf->tot_len > copy_len) {
sock->recv_lastdata = pbuf_free_header(pbuf, copy_len);
} else {
- pbuf_free(pbuf);
- sock->recv_lastdata = NULL;
- sock->stack->stats.app_read_cnt++;
if (get_protocol_stack_group()->latency_start) {
calculate_lstack_latency(&sock->stack->latency, pbuf, GAZELLE_LATENCY_READ);
}
+ ret = rte_ring_sp_enqueue(sock->recv_wait_free, pbuf);
+ if (ret != 0) {
+ pbuf_free(pbuf);
+ }
+ sock->recv_lastdata = NULL;
+ __sync_fetch_and_add(&sock->stack->stats.app_read_cnt, 1);
}
}
- if ((sock->epoll_events & EPOLLIN) && NETCONN_IS_DATAIN(sock)) {
- add_self_event(sock, EPOLLIN);
- sock->stack->stats.read_events++;
- } else {
- sock->events &= ~EPOLLIN;
- }
-
if (recvd == 0) {
sock->stack->stats.read_null++;
GAZELLE_RETURN(EAGAIN);
@@ -588,30 +639,32 @@ void add_recv_list(int32_t fd)
}
}
-void read_recv_list(void)
+void read_recv_list(struct protocol_stack *stack, uint32_t max_num)
{
- struct protocol_stack *stack = get_protocol_stack();
struct list_node *list = &(stack->recv_list);
struct list_node *node, *temp;
struct lwip_sock *sock;
- struct lwip_sock *first_sock = NULL;
+ uint32_t read_num = 0;
list_for_each_safe(node, temp, list) {
sock = container_of(node, struct lwip_sock, recv_list);
- /* when read_lwip_data have data wait to read, add sock into recv_list. read_recv_list read this sock again.
- this is dead loop. so every sock just read one time */
- if (sock == first_sock) {
- break;
- }
- if (first_sock == NULL) {
- first_sock = sock;
+ if (sock->conn == NULL || sock->recv_ring == NULL || sock->send_ring == NULL || sock->conn->pcb.tcp == NULL) {
+ list_del_node_init(&sock->recv_list);
+ continue;
}
- /* recv_ring and send_ring maybe create fail, so check here */
- if (sock->conn && sock->recv_ring && sock->send_ring && rte_ring_free_count(sock->recv_ring)) {
+ if (rte_ring_free_count(sock->recv_ring)) {
list_del_node_init(&sock->recv_list);
- lwip_recv(sock->conn->socket, NULL, 0, sock->recv_flags);
+ ssize_t len = lwip_recv(sock->conn->socket, NULL, 0, sock->recv_flags);
+ if (len == 0) {
+ /* FIXME: should use POLLRDHUP, when connection be closed. lwip event-callback no POLLRDHUP */
+ add_epoll_event(sock->conn, EPOLLERR);
+ }
+ }
+
+ if (++read_num >= max_num) {
+ break;
}
}
}
@@ -633,11 +686,13 @@ static void copy_pcb_to_conn(struct gazelle_stat_lstack_conn_info *conn, const s
struct lwip_sock *sock = get_socket(netconn->socket);
if (netconn->socket > 0 && sock != NULL && sock->recv_ring != NULL && sock->send_ring != NULL) {
conn->recv_ring_cnt = rte_ring_count(sock->recv_ring);
+ conn->recv_ring_cnt += (sock->recv_lastdata) ? 1 : 0;
+
conn->send_ring_cnt = rte_ring_count(sock->send_ring);
- struct weakup_poll *weakup = sock->weakup;
- if (weakup) {
- conn->event_ring_cnt = rte_ring_count(weakup->event_ring);
- conn->self_ring_cnt = rte_ring_count(weakup->self_ring);
+ conn->send_ring_cnt += (sock->send_lastdata) ? 1 : 0;
+
+ if (sock->wakeup) {
+ sem_getvalue(&sock->wakeup->event_sem, &conn->sem_cnt);
}
}
}
@@ -786,11 +841,6 @@ static uint32_t get_list_count(struct list_node *list)
return count;
}
-void stack_wakeuplist_count(struct rpc_msg *msg)
-{
- msg->result = get_list_count(get_protocol_stack()->wakeup_list);
-}
-
void stack_eventlist_count(struct rpc_msg *msg)
{
msg->result = get_list_count(&get_protocol_stack()->event_list);
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index e5761a4..da320e2 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -17,6 +17,7 @@
#include <lwip/tcp.h>
#include <lwip/memp_def.h>
#include <lwipsock.h>
+#include <lwip/posix_api.h>
#include <rte_kni.h>
#include <securec.h>
#include <numa.h>
@@ -28,29 +29,34 @@
#include "lstack_lwip.h"
#include "lstack_protocol_stack.h"
#include "lstack_cfg.h"
-#include "lstack_weakup.h"
#include "lstack_control_plane.h"
#include "lstack_stack_stat.h"
+#define READ_LIST_MAX 32
+#define SEND_LIST_MAX 32
+#define HANDLE_RPC_MSG_MAX 32
+
static PER_THREAD uint16_t g_stack_idx = PROTOCOL_STACK_MAX;
static struct protocol_stack_group g_stack_group = {0};
static PER_THREAD long g_stack_tid = 0;
+static pthread_mutex_t g_mem_mutex = PTHREAD_MUTEX_INITIALIZER;
typedef void *(*stack_thread_func)(void *arg);
-int32_t bind_to_stack_numa(int32_t stack_id)
+#ifdef GAZELLE_USE_EPOLL_EVENT_STACK
+void update_stack_events(struct protocol_stack *stack);
+#endif
+
+pthread_mutex_t *get_mem_mutex(void)
{
- static PER_THREAD int32_t last_stack_id = -1;
+ return &g_mem_mutex;
+}
+int32_t bind_to_stack_numa(struct protocol_stack *stack)
+{
int32_t ret;
- struct protocol_stack *stack = get_protocol_stack_group()->stacks[stack_id];
pthread_t tid = pthread_self();
- if (last_stack_id == stack_id) {
- return 0;
- }
- last_stack_id = stack_id;
-
ret = pthread_setaffinity_np(tid, sizeof(stack->idle_cpuset), &stack->idle_cpuset);
if (ret != 0) {
LSTACK_LOG(ERR, LSTACK, "thread %d setaffinity to stack %d failed\n", rte_gettid(), stack->queue_id);
@@ -159,88 +165,27 @@ void lstack_low_power_idling(void)
}
}
-int32_t init_protocol_stack(void)
+static int32_t create_thread(uint16_t queue_id, char *thread_name, stack_thread_func func)
{
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
- struct protocol_stack *stack = NULL;
+ /* thread may run slow, if arg is temp var maybe have relese */
+ static uint16_t queue[PROTOCOL_STACK_MAX];
+ char name[PATH_MAX];
+ pthread_t tid;
int32_t ret;
- for (uint32_t i = 0; i < stack_group->stack_num; i++) {
- stack = malloc(sizeof(*stack));
- if (stack == NULL) {
- return -ENOMEM;
- }
- memset_s(stack, sizeof(*stack), 0, sizeof(*stack));
-
- stack->queue_id = i;
- stack->port_id = stack_group->port_id;
- stack->cpu_id = get_global_cfg_params()->cpus[i];
- stack->socket_id = numa_node_of_cpu(stack->cpu_id);
- if (stack->socket_id < 0) {
- LSTACK_LOG(ERR, PORT, "numa_node_of_cpu failed\n");
- return -EINVAL;
- }
-
- ret = pktmbuf_pool_init(stack, stack_group->stack_num);
- if (ret != 0) {
- return ret;
- }
-
- ret = create_shared_ring(stack);
- if (ret != 0) {
- return ret;
- }
-
- init_list_node(&stack->recv_list);
- init_list_node(&stack->listen_list);
- init_list_node(&stack->event_list);
- init_list_node(&stack->send_list);
-
- stack_group->stacks[i] = stack;
- }
-
- ret = init_stack_numa_cpuset();
- if (ret < 0) {
+ if (queue_id >= PROTOCOL_STACK_MAX) {
+ LSTACK_LOG(ERR, LSTACK, "queue_id is %d exceed max=%d\n", queue_id, PROTOCOL_STACK_MAX);
return -1;
}
+ queue[queue_id] = queue_id;
- if (!use_ltran()) {
- ret = dpdk_ethdev_init();
- if (ret != 0) {
- LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_init failed\n");
- return -1;
- }
-
- ret = dpdk_ethdev_start();
- if (ret < 0) {
- LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_start failed\n");
- return -1;
- }
-
- if (get_global_cfg_params()->kni_switch) {
- ret = dpdk_init_lstack_kni();
- if (ret < 0) {
- return -1;
- }
- }
- }
-
- return 0;
-}
-
-static int32_t create_thread(struct protocol_stack *stack, char *thread_name, stack_thread_func func)
-{
- char name[PATH_MAX];
- pthread_t tid;
- int32_t ret;
-
- ret = sprintf_s(name, sizeof(name), "%s%02d", thread_name, stack->queue_id);
+ ret = sprintf_s(name, sizeof(name), "%s%02d", thread_name, queue[queue_id]);
if (ret < 0) {
LSTACK_LOG(ERR, LSTACK, "set name failed\n");
return -1;
}
- ret = pthread_create(&tid, NULL, func, stack);
+ ret = pthread_create(&tid, NULL, func, &queue[queue_id]);
if (ret != 0) {
LSTACK_LOG(ERR, LSTACK, "pthread_create ret=%d\n", ret);
return -1;
@@ -257,148 +202,185 @@ static int32_t create_thread(struct protocol_stack *stack, char *thread_name, st
static void* gazelle_weakup_thread(void *arg)
{
- struct protocol_stack *stack = (struct protocol_stack *)arg;
- int32_t lcore_id = get_global_cfg_params()->weakup[stack->queue_id];
+ uint16_t queue_id = *(uint16_t *)arg;
+ struct protocol_stack *stack = get_protocol_stack_group()->stacks[queue_id];
+ int32_t lcore_id = get_global_cfg_params()->wakeup[stack->queue_id];
thread_affinity_init(lcore_id);
- LSTACK_LOG(INFO, LSTACK, "weakup_%02d start\n", stack->queue_id);
- struct list_node wakeup_list;
- init_list_node(&wakeup_list);
- stack->wakeup_list = &wakeup_list;
+ LSTACK_LOG(INFO, LSTACK, "weakup_%02d start\n", stack->queue_id);
for (;;) {
- wakeup_list_sock(&wakeup_list);
+ sem_t *event_sem;
+ if (rte_ring_sc_dequeue(stack->wakeup_ring, (void **)&event_sem)) {
+ continue;
+ }
- weakup_thread(stack->weakup_ring, &wakeup_list);
+ sem_post(event_sem);
}
return NULL;
}
-static void stack_thread_init(struct protocol_stack *stack)
+static void init_stack_value(struct protocol_stack *stack, uint16_t queue_id)
{
- uint16_t queue_id = stack->queue_id;
- int32_t ret;
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+
+ memset_s(stack, sizeof(*stack), 0, sizeof(*stack));
set_stack_idx(queue_id);
stack->tid = gettid();
+ stack->queue_id = queue_id;
+ stack->port_id = stack_group->port_id;
+ stack->cpu_id = get_global_cfg_params()->cpus[queue_id];
stack->lwip_stats = &lwip_stats;
- RTE_PER_LCORE(_lcore_id) = stack->cpu_id;
- thread_affinity_init(stack->cpu_id);
+ init_list_node(&stack->recv_list);
+ init_list_node(&stack->listen_list);
+ init_list_node(&stack->event_list);
+ init_list_node(&stack->send_list);
+
+ pthread_spin_init(&stack->event_lock, PTHREAD_PROCESS_SHARED);
sys_calibrate_tsc();
+ stack_stat_init();
- hugepage_init();
+ stack_group->stacks[queue_id] = stack;
+}
- stack_replenish_send_idlembuf(stack);
+static struct protocol_stack * stack_thread_init(uint16_t queue_id)
+{
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
- tcpip_init(NULL, NULL);
+ struct protocol_stack *stack = malloc(sizeof(*stack));
+ if (stack == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "malloc stack failed\n");
+ return NULL;
+ }
+ init_stack_value(stack, queue_id);
- if (use_ltran()) {
- ret = client_reg_thrd_ring();
- if (ret != 0) {
- LSTACK_EXIT(1, "failed reg thread ret=%d\n", ret);
- }
+ cpu_set_t cpuset;
+ CPU_ZERO(&cpuset);
+ CPU_SET(stack->cpu_id, &cpuset);
+ if (rte_thread_set_affinity(&cpuset) != 0) {
+ LSTACK_LOG(ERR, LSTACK, "rte_thread_set_affinity failed\n");
+ free(stack);
+ return NULL;
}
+ RTE_PER_LCORE(_lcore_id) = stack->cpu_id;
- ret = ethdev_init(stack);
+ stack->socket_id = numa_node_of_cpu(stack->cpu_id);
+ if (stack->socket_id < 0) {
+ LSTACK_LOG(ERR, LSTACK, "numa_node_of_cpu failed\n");
+ free(stack);
+ return NULL;
+ }
+
+ int32_t ret = pktmbuf_pool_init(stack, stack_group->stack_num);
if (ret != 0) {
- LSTACK_EXIT(1, "failed reg thread ret=%d\n", ret);
+ free(stack);
+ return NULL;
}
- stack_stat_init();
+ ret = create_shared_ring(stack);
+ if (ret != 0) {
+ free(stack);
+ return NULL;
+ }
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
- sem_post(&stack_group->thread_inited);
- LSTACK_LOG(INFO, LSTACK, "stack_%02d init success\n", queue_id);
-}
+ thread_affinity_init(stack->cpu_id);
-static void report_stack_event(struct protocol_stack *stack)
-{
- struct list_node *list = &(stack->event_list);
- struct list_node *node, *temp;
- struct lwip_sock *sock;
+ hugepage_init();
- list_for_each_safe(node, temp, list) {
- sock = container_of(node, struct lwip_sock, event_list);
+ tcpip_init(NULL, NULL);
- if (weakup_enqueue(stack->weakup_ring, sock) == 0) {
- __atomic_store_n(&sock->have_event, true, __ATOMIC_RELEASE);
- list_del_node_init(&sock->event_list);
- stack->stats.weakup_events++;
- } else {
- break;
+ if (use_ltran()) {
+ ret = client_reg_thrd_ring();
+ if (ret != 0) {
+ free(stack);
+ return NULL;
}
}
-}
-
-static void send_stack_list(struct protocol_stack *stack)
-{
- struct list_node *list = &(stack->send_list);
- struct list_node *node, *temp;
- struct lwip_sock *sock;
- list_for_each_safe(node, temp, list) {
- sock = container_of(node, struct lwip_sock, send_list);
+ sem_post(&stack_group->thread_phase1);
- if (sock->conn == NULL || sock->stack == NULL) {
- list_del_node_init(&sock->send_list);
- continue;
- }
+ int32_t sem_val;
+ do {
+ sem_getvalue(&stack_group->ethdev_init, &sem_val);
+ } while (!sem_val && !use_ltran());
- ssize_t ret = write_lwip_data(sock, sock->conn->socket, sock->send_flags);
- __atomic_store_n(&sock->have_rpc_send, false, __ATOMIC_RELEASE);
- if (ret >= 0 &&
- (rte_ring_count(sock->send_ring) || sock->send_lastdata)) {
- __atomic_store_n(&sock->have_rpc_send, true, __ATOMIC_RELEASE);
- } else {
- list_del_node_init(&sock->send_list);
- }
+ ret = ethdev_init(stack);
+ if (ret != 0) {
+ free(stack);
+ return NULL;
+ }
- if (rte_ring_free_count(sock->send_ring)) {
- add_epoll_event(sock->conn, EPOLLOUT);
+ if (stack_group->wakeup_enable) {
+ ret = create_thread(stack->queue_id, "gazelleweakup", gazelle_weakup_thread);
+ if (ret != 0) {
+ free(stack);
+ return NULL;
}
}
+
+ return stack;
}
static void* gazelle_stack_thread(void *arg)
{
- struct protocol_stack *stack = (struct protocol_stack *)arg;
+ uint16_t queue_id = *(uint16_t *)arg;
- stack_thread_init(stack);
+ struct protocol_stack *stack = stack_thread_init(queue_id);
+ if (stack == NULL) {
+ pthread_mutex_lock(&g_mem_mutex);
+ LSTACK_EXIT(1, "stack_thread_init failed\n");
+ pthread_mutex_unlock(&g_mem_mutex);
+ }
+ LSTACK_LOG(INFO, LSTACK, "stack_%02d init success\n", queue_id);
for (;;) {
- poll_rpc_msg(stack);
+ poll_rpc_msg(stack, HANDLE_RPC_MSG_MAX);
eth_dev_poll();
- read_recv_list();
+ read_recv_list(stack, READ_LIST_MAX);
- sys_timer_run();
+ send_stack_list(stack, SEND_LIST_MAX);
- report_stack_event(stack);
+ sys_timer_run();
- send_stack_list(stack);
+#ifdef GAZELLE_USE_EPOLL_EVENT_STACK
+ update_stack_events(stack);
+#endif
}
return NULL;
}
-int32_t create_stack_thread(void)
+int32_t init_protocol_stack(void)
{
struct protocol_stack_group *stack_group = get_protocol_stack_group();
int32_t ret;
- ret = sem_init(&stack_group->thread_inited, 0, 0);
+ stack_group->stack_num = get_global_cfg_params()->num_cpu;
+ stack_group->wakeup_enable = (get_global_cfg_params()->num_wakeup > 0) ? true : false;
+
+ if (!use_ltran()) {
+ ret = sem_init(&stack_group->ethdev_init, 0, 0);
+ if (ret < 0) {
+ LSTACK_LOG(ERR, PORT, "sem_init failed\n");
+ return -1;
+ }
+ }
+
+ ret = sem_init(&stack_group->thread_phase1, 0, 0);
if (ret < 0) {
LSTACK_LOG(ERR, PORT, "sem_init failed\n");
return -1;
}
for (uint32_t i = 0; i < stack_group->stack_num; i++) {
- ret = create_thread(stack_group->stacks[i], "gazellestack", gazelle_stack_thread);
+ ret = create_thread(i, "gazellestack", gazelle_stack_thread);
if (ret != 0) {
return ret;
}
@@ -406,14 +388,12 @@ int32_t create_stack_thread(void)
int32_t thread_inited_num;
do {
- sem_getvalue(&stack_group->thread_inited, &thread_inited_num);
+ sem_getvalue(&stack_group->thread_phase1, &thread_inited_num);
} while (thread_inited_num < stack_group->stack_num);
- for (uint32_t i = 0; i < stack_group->stack_num; i++) {
- ret = create_thread(stack_group->stacks[i], "gazelleweakup", gazelle_weakup_thread);
- if (ret != 0) {
- return ret;
- }
+ ret = init_stack_numa_cpuset();
+ if (ret < 0) {
+ return -1;
}
return 0;
@@ -440,7 +420,6 @@ static inline bool is_real_close(int32_t fd)
/* last sock */
if (list_is_empty(&sock->attach_list)) {
- list_del_node_init(&sock->attach_list);
return true;
}
@@ -557,24 +536,6 @@ void stack_listen(struct rpc_msg *msg)
}
}
-static bool have_accept_event(int32_t fd)
-{
- do {
- struct lwip_sock *sock = get_socket(fd);
- if (sock == NULL) {
- break;
- }
-
- if (NETCONN_IS_ACCEPTIN(sock)) {
- return true;
- }
-
- fd = sock->nextfd;
- } while (fd > 0);
-
- return false;
-}
-
void stack_accept(struct rpc_msg *msg)
{
int32_t fd = msg->args[MSG_ARG_0].i;
@@ -593,7 +554,7 @@ void stack_accept(struct rpc_msg *msg)
}
LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d attach_fd %d failed %d\n", get_stack_tid(), msg->args[MSG_ARG_0].i,
- fd, accept_fd);
+ fd, accept_fd);
msg->result = -1;
}
@@ -768,13 +729,11 @@ int32_t stack_broadcast_listen(int32_t fd, int32_t backlog)
int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *addrlen)
{
struct lwip_sock *sock = get_socket(fd);
- if (sock == NULL) {
+ if (sock == NULL || sock->attach_fd < 0) {
errno = EINVAL;
return -1;
}
-
fd = sock->attach_fd;
- int32_t head_fd = fd;
struct lwip_sock *min_sock = NULL;
int32_t min_fd = fd;
@@ -783,15 +742,19 @@ int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *add
if (sock == NULL) {
GAZELLE_RETURN(EINVAL);
}
+ struct lwip_sock *attach_sock = get_socket(sock->attach_fd);
+ if (attach_sock == NULL) {
+ GAZELLE_RETURN(EINVAL);
+ }
- if (!NETCONN_IS_ACCEPTIN(sock)) {
+ if (!NETCONN_IS_ACCEPTIN(attach_sock)) {
fd = sock->nextfd;
continue;
}
- if (min_sock == NULL || min_sock->stack->conn_num > sock->stack->conn_num) {
- min_sock = sock;
- min_fd = fd;
+ if (min_sock == NULL || min_sock->stack->conn_num > attach_sock->stack->conn_num) {
+ min_sock = attach_sock;
+ min_fd = sock->attach_fd;
}
fd = sock->nextfd;
@@ -802,13 +765,7 @@ int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *add
ret = rpc_call_accept(min_fd, addr, addrlen);
}
- if (have_accept_event(head_fd)) {
- add_self_event(sock, EPOLLIN);
- sock = get_socket(head_fd);
- sock->stack->stats.accept_events++;
- }
-
- if(ret < 0) {
+ if (ret < 0) {
errno = EAGAIN;
}
return ret;
diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c
index 1813906..743857f 100644
--- a/src/lstack/core/lstack_stack_stat.c
+++ b/src/lstack/core/lstack_stack_stat.c
@@ -105,8 +105,6 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_
lstack_get_low_power_info(&dfx->low_power_info);
memcpy_s(&dfx->data.pkts, sizeof(dfx->data.pkts), &stack->stats, sizeof(dfx->data.pkts));
dfx->data.pkts.call_alloc_fail = stack_group->call_alloc_fail;
- dfx->data.pkts.weakup_ring_cnt = rte_ring_count(stack->weakup_ring);
- dfx->data.pkts.send_idle_ring_cnt = rte_ring_count(stack->send_idle_ring);
int32_t rpc_call_result = rpc_call_msgcnt(stack);
dfx->data.pkts.call_msg_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result;
@@ -120,10 +118,6 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_
rpc_call_result = rpc_call_sendlistcnt(stack);
dfx->data.pkts.send_list = (rpc_call_result < 0) ? 0 : rpc_call_result;
- if (stack->wakeup_list) {
- rpc_call_result = rpc_call_eventlistcnt(stack);
- dfx->data.pkts.wakeup_list = (rpc_call_result < 0) ? 0 : rpc_call_result;
- }
dfx->data.pkts.conn_num = stack->conn_num;
}
diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c
index 2a67333..26725f7 100644
--- a/src/lstack/core/lstack_thread_rpc.c
+++ b/src/lstack/core/lstack_thread_rpc.c
@@ -19,9 +19,10 @@
#include "lstack_protocol_stack.h"
#include "lstack_control_plane.h"
#include "gazelle_base_func.h"
+#include "lstack_dpdk.h"
#include "lstack_thread_rpc.h"
-#define HANDLE_RPC_MSG_MAX (8)
+static PER_THREAD struct rte_mempool *rpc_pool = NULL;
static inline __attribute__((always_inline))
struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func func)
@@ -33,11 +34,20 @@ struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func func)
return NULL;
}
- ret = rte_mempool_get(stack->rpc_pool, (void **)&msg);
+ static uint16_t pool_index = 0;
+ if (rpc_pool == NULL) {
+ rpc_pool = create_rpc_mempool("rpc_msg", pool_index++);
+ if (rpc_pool == NULL) {
+ return NULL;
+ }
+ }
+
+ ret = rte_mempool_get(rpc_pool, (void **)&msg);
if (ret < 0) {
get_protocol_stack_group()->call_alloc_fail++;
return NULL;
}
+ msg->pool = rpc_pool;
pthread_spin_init(&msg->lock, PTHREAD_PROCESS_SHARED);
msg->func = func;
@@ -47,13 +57,13 @@ struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func func)
}
static inline __attribute__((always_inline))
-void rpc_msg_free(struct rte_mempool *pool, struct rpc_msg *msg)
+void rpc_msg_free(struct rpc_msg *msg)
{
pthread_spin_destroy(&msg->lock);
msg->self_release = 0;
msg->func = NULL;
- rte_mempool_put(pool, (void *)msg);
+ rte_mempool_put(msg->pool, (void *)msg);
}
static inline __attribute__((always_inline))
@@ -64,7 +74,7 @@ void rpc_call(lockless_queue *queue, struct rpc_msg *msg)
}
static inline __attribute__((always_inline))
-int32_t rpc_sync_call(lockless_queue *queue, struct rte_mempool *pool, struct rpc_msg *msg)
+int32_t rpc_sync_call(lockless_queue *queue, struct rpc_msg *msg)
{
int32_t ret;
@@ -74,20 +84,20 @@ int32_t rpc_sync_call(lockless_queue *queue, struct rte_mempool *pool, struct rp
pthread_spin_lock(&msg->lock);
ret = msg->result;
- rpc_msg_free(pool, msg);
+ rpc_msg_free(msg);
return ret;
}
-void poll_rpc_msg(struct protocol_stack *stack)
+void poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num)
{
- int32_t num;
+ uint32_t num;
struct rpc_msg *msg = NULL;
num = 0;
- while (num++ < HANDLE_RPC_MSG_MAX) {
+ while (num++ < max_num) {
lockless_queue_node *node = lockless_queue_mpsc_pop(&stack->rpc_queue);
if (node == NULL) {
- return;
+ break;
}
msg = container_of(node, struct rpc_msg, queue_node);
@@ -103,7 +113,7 @@ void poll_rpc_msg(struct protocol_stack *stack)
if (msg->self_release) {
pthread_spin_unlock(&msg->lock);
} else {
- rpc_msg_free(stack->rpc_pool, msg);
+ rpc_msg_free(msg);
}
}
}
@@ -118,7 +128,7 @@ int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint3
msg->args[MSG_ARG_0].p = conn_table;
msg->args[MSG_ARG_1].u = max_conn;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_connnum(struct protocol_stack *stack)
@@ -128,7 +138,7 @@ int32_t rpc_call_connnum(struct protocol_stack *stack)
return -1;
}
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen)
@@ -142,7 +152,7 @@ int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struc
msg->args[MSG_ARG_1].cp = addr;
msg->args[MSG_ARG_2].socklen = addrlen;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
static void rpc_msgcnt(struct rpc_msg *msg)
@@ -158,7 +168,7 @@ int32_t rpc_call_msgcnt(struct protocol_stack *stack)
return -1;
}
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn)
@@ -168,7 +178,7 @@ int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn)
return -1;
}
msg->args[MSG_ARG_0].p = conn;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn)
@@ -178,17 +188,7 @@ int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn)
return -1;
}
msg->args[MSG_ARG_0].p = conn;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
-}
-
-int32_t rpc_call_wakeuplistcnt(struct protocol_stack *stack)
-{
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_wakeuplist_count);
- if (msg == NULL) {
- return -1;
- }
-
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_eventlistcnt(struct protocol_stack *stack)
@@ -198,7 +198,7 @@ int32_t rpc_call_eventlistcnt(struct protocol_stack *stack)
return -1;
}
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_sendlistcnt(struct protocol_stack *stack)
@@ -208,7 +208,7 @@ int32_t rpc_call_sendlistcnt(struct protocol_stack *stack)
return -1;
}
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_recvlistcnt(struct protocol_stack *stack)
@@ -218,7 +218,7 @@ int32_t rpc_call_recvlistcnt(struct protocol_stack *stack)
return -1;
}
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
void add_epoll_event(struct netconn *conn, uint32_t event);
@@ -243,24 +243,6 @@ void rpc_call_addevent(struct protocol_stack *stack, void *sock)
rpc_call(&stack->rpc_queue, msg);
}
-static void rpc_replenish_idlembuf(struct rpc_msg *msg)
-{
- struct protocol_stack *stack = get_protocol_stack();
- stack_replenish_send_idlembuf(stack);
- stack->in_replenish = 0;
-}
-
-void rpc_call_replenish_idlembuf(struct protocol_stack *stack)
-{
- struct rpc_msg *msg = rpc_msg_alloc(stack, rpc_replenish_idlembuf);
- if (msg == NULL) {
- return;
- }
-
- msg->self_release = 0;
- rpc_call(&stack->rpc_queue, msg);
-}
-
int32_t rpc_call_arp(struct protocol_stack *stack, struct rte_mbuf *mbuf)
{
struct rpc_msg *msg = rpc_msg_alloc(stack, stack_arp);
@@ -287,7 +269,7 @@ int32_t rpc_call_socket(int32_t domain, int32_t type, int32_t protocol)
msg->args[MSG_ARG_1].i = type;
msg->args[MSG_ARG_2].i = protocol;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_close(int fd)
@@ -300,7 +282,7 @@ int32_t rpc_call_close(int fd)
msg->args[MSG_ARG_0].i = fd;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_bind(int32_t fd, const struct sockaddr *addr, socklen_t addrlen)
@@ -315,7 +297,7 @@ int32_t rpc_call_bind(int32_t fd, const struct sockaddr *addr, socklen_t addrlen
msg->args[MSG_ARG_1].cp = addr;
msg->args[MSG_ARG_2].socklen = addrlen;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_listen(int s, int backlog)
@@ -329,7 +311,7 @@ int32_t rpc_call_listen(int s, int backlog)
msg->args[MSG_ARG_0].i = s;
msg->args[MSG_ARG_1].i = backlog;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_accept(int fd, struct sockaddr *addr, socklen_t *addrlen)
@@ -344,7 +326,7 @@ int32_t rpc_call_accept(int fd, struct sockaddr *addr, socklen_t *addrlen)
msg->args[MSG_ARG_1].p = addr;
msg->args[MSG_ARG_2].p = addrlen;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen)
@@ -359,7 +341,7 @@ int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen)
msg->args[MSG_ARG_1].cp = addr;
msg->args[MSG_ARG_2].socklen = addrlen;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_getpeername(int fd, struct sockaddr *addr, socklen_t *addrlen)
@@ -374,7 +356,7 @@ int32_t rpc_call_getpeername(int fd, struct sockaddr *addr, socklen_t *addrlen)
msg->args[MSG_ARG_1].p = addr;
msg->args[MSG_ARG_2].p = addrlen;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_getsockname(int fd, struct sockaddr *addr, socklen_t *addrlen)
@@ -389,7 +371,7 @@ int32_t rpc_call_getsockname(int fd, struct sockaddr *addr, socklen_t *addrlen)
msg->args[MSG_ARG_1].p = addr;
msg->args[MSG_ARG_2].p = addrlen;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_getsockopt(int fd, int level, int optname, void *optval, socklen_t *optlen)
@@ -406,7 +388,7 @@ int32_t rpc_call_getsockopt(int fd, int level, int optname, void *optval, sockle
msg->args[MSG_ARG_3].p = optval;
msg->args[MSG_ARG_4].p = optlen;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_setsockopt(int fd, int level, int optname, const void *optval, socklen_t optlen)
@@ -423,7 +405,7 @@ int32_t rpc_call_setsockopt(int fd, int level, int optname, const void *optval,
msg->args[MSG_ARG_3].cp = optval;
msg->args[MSG_ARG_4].socklen = optlen;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_fcntl(int fd, int cmd, long val)
@@ -438,7 +420,7 @@ int32_t rpc_call_fcntl(int fd, int cmd, long val)
msg->args[MSG_ARG_1].i = cmd;
msg->args[MSG_ARG_2].l = val;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_ioctl(int fd, long cmd, void *argp)
@@ -453,7 +435,7 @@ int32_t rpc_call_ioctl(int fd, long cmd, void *argp)
msg->args[MSG_ARG_1].l = cmd;
msg->args[MSG_ARG_2].p = argp;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
ssize_t rpc_call_send(int fd, const void *buf, size_t len, int flags)
@@ -486,7 +468,7 @@ int32_t rpc_call_sendmsg(int fd, const struct msghdr *msghdr, int flags)
msg->args[MSG_ARG_1].cp = msghdr;
msg->args[MSG_ARG_2].i = flags;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
int32_t rpc_call_recvmsg(int fd, struct msghdr *msghdr, int flags)
@@ -501,5 +483,5 @@ int32_t rpc_call_recvmsg(int fd, struct msghdr *msghdr, int flags)
msg->args[MSG_ARG_1].p = msghdr;
msg->args[MSG_ARG_2].i = flags;
- return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+ return rpc_sync_call(&stack->rpc_queue, msg);
}
diff --git a/src/lstack/include/lstack_cfg.h b/src/lstack/include/lstack_cfg.h
index 48b7e44..345a373 100644
--- a/src/lstack/include/lstack_cfg.h
+++ b/src/lstack/include/lstack_cfg.h
@@ -66,7 +66,7 @@ struct cfg_params {
uint16_t num_cpu;
uint32_t cpus[CFG_MAX_CPUS];
uint16_t num_wakeup;
- uint32_t weakup[CFG_MAX_CPUS];
+ uint32_t wakeup[CFG_MAX_CPUS];
uint8_t num_ports;
uint16_t ports[CFG_MAX_PORTS];
char log_file[PATH_MAX];
diff --git a/src/lstack/include/lstack_dpdk.h b/src/lstack/include/lstack_dpdk.h
index e76a9a6..e8080e1 100644
--- a/src/lstack/include/lstack_dpdk.h
+++ b/src/lstack/include/lstack_dpdk.h
@@ -37,10 +37,10 @@ struct protocol_stack;
#define MBUF_SZ (MAX_PACKET_SZ + RTE_PKTMBUF_HEADROOM)
-#define MAX_CORE_NUM 256
+#define MAX_CORE_NUM 256
#define CALL_MSG_RING_SIZE (unsigned long long)32
-#define CALL_CACHE_SZ 64
-#define CALL_POOL_SZ ((VDEV_CALL_QUEUE_SZ << 1) + (2 * CALL_CACHE_SZ * MAX_CORE_NUM))
+#define CALL_CACHE_SZ 0
+#define CALL_POOL_SZ 128
/* Layout:
* | rte_mbuf | pbuf | custom_free_function | payload |
@@ -62,6 +62,7 @@ void dpdk_eal_init(void);
int32_t pktmbuf_pool_init(struct protocol_stack *stack, uint16_t stack_num);
struct rte_ring *create_ring(const char *name, uint32_t count, uint32_t flags, int32_t queue_id);
int32_t create_shared_ring(struct protocol_stack *stack);
+struct rte_mempool *create_rpc_mempool(const char *name, uint16_t queue_id);
void lstack_log_level_init(void);
int dpdk_ethdev_init(void);
int dpdk_ethdev_start(void);
diff --git a/src/lstack/include/lstack_log.h b/src/lstack/include/lstack_log.h
index 383495d..8b4209a 100644
--- a/src/lstack/include/lstack_log.h
+++ b/src/lstack/include/lstack_log.h
@@ -15,17 +15,14 @@
#include <stdio.h>
#include <syslog.h>
-
#include <rte_log.h>
-#include "lwipopts.h"
-
-#define RTE_LOGTYPE_LSTACK RTE_LOGTYPE_USER1
+#define RTE_LOGTYPE_LSTACK RTE_LOGTYPE_USER1
#define LSTACK_EXIT(a, fmt, ...) rte_exit(a, "%s:%d "fmt, __FUNCTION__, __LINE__, ##__VA_ARGS__)
#define LSTACK_LOG(a, b, fmt, ...) (void)RTE_LOG(a, b, "%s:%d "fmt, __FUNCTION__, __LINE__, ##__VA_ARGS__)
-#define LSTACK_INFO LOG_INFO
-#define LSTACK_ERR LOG_ERR
+#define LSTACK_INFO LOG_INFO
+#define LSTACK_ERR LOG_ERR
/* before rte_eal_init */
#define LSTACK_PRE_LOG(level, fmt, ...) \
diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h
index ffd3b80..c73e3a7 100644
--- a/src/lstack/include/lstack_lwip.h
+++ b/src/lstack/include/lstack_lwip.h
@@ -21,32 +21,31 @@
#define NETCONN_IS_ACCEPTIN(sock) (((sock)->conn->acceptmbox != NULL) && !sys_mbox_empty((sock)->conn->acceptmbox))
#define NETCONN_IS_DATAIN(sock) ((rte_ring_count((sock)->recv_ring) || (sock)->recv_lastdata))
-#define NETCONN_IS_DATAOUT(sock) rte_ring_free_count((sock)->send_ring)
+#define NETCONN_IS_DATAOUT(sock) ((rte_ring_count((sock)->send_ring) || (sock)->send_lastdata))
+#define NETCONN_IS_OUTIDLE(sock) rte_ring_free_count((sock)->send_ring)
void create_shadow_fd(struct rpc_msg *msg);
void listen_list_add_node(int32_t head_fd, int32_t add_fd);
void gazelle_init_sock(int32_t fd);
int32_t gazelle_socket(int domain, int type, int protocol);
void gazelle_clean_sock(int32_t fd);
-ssize_t write_lwip_data(struct lwip_sock *sock, int32_t fd, int32_t flags);
+struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size, uint8_t *apiflags);
ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len);
ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags);
ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags);
-void read_recv_list(void);
+void read_recv_list(struct protocol_stack *stack, uint32_t max_num);
+void send_stack_list(struct protocol_stack *stack, uint32_t send_max);
void add_recv_list(int32_t fd);
void stack_sendlist_count(struct rpc_msg *msg);
void stack_eventlist_count(struct rpc_msg *msg);
-void stack_wakeuplist_count(struct rpc_msg *msg);
void get_lwip_conntable(struct rpc_msg *msg);
void get_lwip_connnum(struct rpc_msg *msg);
void stack_recvlist_count(struct rpc_msg *msg);
void stack_send(struct rpc_msg *msg);
-void stack_replenish_send_idlembuf(struct protocol_stack *stack);
int32_t gazelle_alloc_pktmbuf(struct rte_mempool *pool, struct rte_mbuf **mbufs, uint32_t num);
void gazelle_free_pbuf(struct pbuf *pbuf);
ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags);
ssize_t recvmsg_from_stack(int32_t s, struct msghdr *message, int32_t flags);
ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags);
-void add_self_event(struct lwip_sock *sock, uint32_t events);
#endif
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index 39052e1..9753385 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -14,6 +14,7 @@
#define __GAZELLE_PROTOCOL_STACK_H__
#include <semaphore.h>
+#include <pthread.h>
#include <lwip/list.h>
#include <lwip/netif.h>
#include "dpdk_common.h"
@@ -28,38 +29,32 @@ struct protocol_stack {
uint16_t socket_id;
uint16_t cpu_id;
volatile uint16_t conn_num;
- volatile bool in_replenish;
-
- // for dispatcher thread
- cpu_set_t idle_cpuset;
+ cpu_set_t idle_cpuset; /* idle cpu in numa of stack, app thread bind to it */
lockless_queue rpc_queue;
- struct rte_ring *weakup_ring;
-
struct rte_mempool *rx_pktmbuf_pool;
struct rte_mempool *tx_pktmbuf_pool;
- struct rte_mempool *rpc_pool;
struct rte_ring *rx_ring;
struct rte_ring *tx_ring;
struct rte_ring *reg_ring;
- struct rte_ring *send_idle_ring;
+ struct rte_ring *wakeup_ring;
+
struct reg_ring_msg *reg_buf;
struct netif netif;
uint32_t rx_ring_used;
uint32_t tx_ring_used;
+ struct eth_dev_ops *dev_ops;
struct list_node recv_list;
struct list_node listen_list;
- struct list_node event_list;
struct list_node send_list;
- struct list_node *wakeup_list;
+ struct list_node event_list;
+ pthread_spinlock_t event_lock;
struct gazelle_stat_pkts stats;
struct gazelle_stack_latency latency;
struct stats_ *lwip_stats;
-
- struct eth_dev_ops *dev_ops;
};
struct eth_params;
@@ -67,25 +62,35 @@ struct eth_params;
struct protocol_stack_group {
uint16_t stack_num;
uint16_t port_id;
- sem_t thread_inited;
+ sem_t thread_phase1;
+ sem_t ethdev_init;
struct rte_mempool *kni_pktmbuf_pool;
struct eth_params *eth_params;
struct protocol_stack *stacks[PROTOCOL_STACK_MAX];
+ bool wakeup_enable;
/* dfx stats */
bool latency_start;
uint64_t call_alloc_fail;
};
+struct wakeup_poll {
+ bool init;
+ struct protocol_stack *bind_stack;
+ struct list_node event_list; /* epoll temp use poll */
+ sem_t event_sem;
+};
+
long get_stack_tid(void);
+pthread_mutex_t *get_mem_mutex(void);
struct protocol_stack *get_protocol_stack(void);
struct protocol_stack *get_protocol_stack_by_fd(int32_t fd);
struct protocol_stack *get_minconn_protocol_stack(void);
struct protocol_stack_group *get_protocol_stack_group(void);
int32_t init_protocol_stack(void);
-int32_t create_stack_thread(void);
-int bind_to_stack_numa(int stack_id);
+int32_t bind_to_stack_numa(struct protocol_stack *stack);
+int32_t init_dpdk_ethdev(void);
/* any protocol stack thread receives arp packet and sync it to other threads so that it can have the arp table */
void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack);
diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h
index 20539d9..61bcd38 100644
--- a/src/lstack/include/lstack_thread_rpc.h
+++ b/src/lstack/include/lstack_thread_rpc.h
@@ -42,21 +42,20 @@ struct rpc_msg {
int32_t self_release; /* 0:msg handler release msg 1:msg sender release msg */
int64_t result; /* func return val */
lockless_queue_node queue_node;
+ struct rte_mempool *pool;
rpc_msg_func func; /* msg handle func hook */
union rpc_msg_arg args[RPM_MSG_ARG_SIZE]; /* resolve by type */
};
struct protocol_stack;
-void poll_rpc_msg(struct protocol_stack *stack);
-void rpc_call_replenish_idlembuf(struct protocol_stack *stack);
+void poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num);
void rpc_call_addevent(struct protocol_stack *stack, void *sock);
int32_t rpc_call_msgcnt(struct protocol_stack *stack);
int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen);
int32_t rpc_call_recvlistcnt(struct protocol_stack *stack);
int32_t rpc_call_eventlistcnt(struct protocol_stack *stack);
int32_t rpc_call_sendlistcnt(struct protocol_stack *stack);
-int32_t rpc_call_wakeuplistcnt(struct protocol_stack *stack);
int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn);
int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn);
int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint32_t max_conn);
diff --git a/src/lstack/include/lstack_vdev.h b/src/lstack/include/lstack_vdev.h
index 916b1e2..31a997d 100644
--- a/src/lstack/include/lstack_vdev.h
+++ b/src/lstack/include/lstack_vdev.h
@@ -23,7 +23,7 @@
#define VDEV_EVENT_QUEUE_SZ (DEFAULT_RING_SIZE)
#define VDEV_REG_QUEUE_SZ (DEFAULT_RING_SIZE)
#define VDEV_CALL_QUEUE_SZ (DEFAULT_RING_SIZE)
-#define VDEV_WEAKUP_QUEUE_SZ (DEFAULT_RING_SIZE)
+#define VDEV_WAKEUP_QUEUE_SZ (DEFAULT_RING_SIZE)
#define VDEV_IDLE_QUEUE_SZ (DEFAULT_RING_SIZE)
#define VDEV_TX_QUEUE_SZ (DEFAULT_RING_SIZE)
diff --git a/src/lstack/include/posix/lstack_epoll.h b/src/lstack/include/posix/lstack_epoll.h
index 2b3cff4..cac640b 100644
--- a/src/lstack/include/posix/lstack_epoll.h
+++ b/src/lstack/include/posix/lstack_epoll.h
@@ -17,6 +17,8 @@
extern "C" {
#endif
+#include <poll.h>
+
int32_t lstack_epoll_create(int32_t size);
int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_event *event);
int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event *events, int32_t maxevents, int32_t timeout);
diff --git a/src/lstack/lstack.conf b/src/lstack/lstack.conf
index fdca602..696dfb9 100644
--- a/src/lstack/lstack.conf
+++ b/src/lstack/lstack.conf
@@ -16,7 +16,6 @@ kni_switch=0
low_power_mode=0
num_cpus="2"
-num_wakeup="3"
host_addr="192.168.1.10"
mask_addr="255.255.255.0"
diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c
index 8b2193f..ae39403 100644
--- a/src/lstack/netif/lstack_ethdev.c
+++ b/src/lstack/netif/lstack_ethdev.c
@@ -51,6 +51,9 @@ void eth_dev_recv(struct rte_mbuf *mbuf)
stack->stats.rx_allocmbuf_fail++;
break;
}
+#if CHECKSUM_CHECK_IP_HW || CHECKSUM_CHECK_TCP_HW
+ next->ol_flags = m->ol_flags;
+#endif
if (head == NULL) {
head = next;
@@ -73,18 +76,19 @@ void eth_dev_recv(struct rte_mbuf *mbuf)
}
}
+#define READ_PKTS_MAX 32
int32_t eth_dev_poll(void)
{
uint32_t nr_pkts;
- struct rte_mbuf *pkts[DPDK_PKT_BURST_SIZE];
+ struct rte_mbuf *pkts[READ_PKTS_MAX];
struct protocol_stack *stack = get_protocol_stack();
- nr_pkts = stack->dev_ops->rx_poll(stack, pkts, DPDK_PKT_BURST_SIZE);
+ nr_pkts = stack->dev_ops->rx_poll(stack, pkts, READ_PKTS_MAX);
if (nr_pkts == 0) {
return nr_pkts;
}
- if (get_protocol_stack_group()->latency_start) {
+ if (!use_ltran() && get_protocol_stack_group()->latency_start) {
uint64_t time_stamp = get_current_time();
time_stamp_into_mbuf(nr_pkts, pkts, time_stamp);
}
@@ -146,19 +150,6 @@ static err_t eth_dev_output(struct netif *netif, struct pbuf *pbuf)
return ERR_OK;
}
-static err_t eth_dev_input(struct pbuf *p, struct netif *netif)
-{
- err_t ret = ethernet_input(p, netif);
- if (ret != ERR_OK) {
- return ret;
- }
-
- if (get_protocol_stack_group()->latency_start) {
- calculate_lstack_latency(&get_protocol_stack()->latency, p, GAZELLE_LATENCY_LWIP);
- }
- return ret;
-}
-
static err_t eth_dev_init(struct netif *netif)
{
struct cfg_params *cfg = get_global_cfg_params();
@@ -200,7 +191,7 @@ int32_t ethdev_init(struct protocol_stack *stack)
netif_set_default(&stack->netif);
struct netif *netif = netif_add(&stack->netif, &cfg->host_addr, &cfg->netmask, &cfg->gateway_addr, NULL,
- eth_dev_init, eth_dev_input);
+ eth_dev_init, ethernet_input);
if (netif == NULL) {
LSTACK_LOG(ERR, LSTACK, "netif_add failed\n");
return ERR_IF;
diff --git a/src/lstack/netif/lstack_vdev.c b/src/lstack/netif/lstack_vdev.c
index 57d3bce..5a4e86a 100644
--- a/src/lstack/netif/lstack_vdev.c
+++ b/src/lstack/netif/lstack_vdev.c
@@ -91,7 +91,7 @@ static uint32_t vdev_tx_xmit(struct protocol_stack *stack, struct rte_mbuf **pkt
uint32_t sent_pkts = 0;
do {
- sent_pkts += rte_eth_tx_burst(stack->port_id, stack->queue_id, &pkts[sent_pkts], nr_pkts);
+ sent_pkts += rte_eth_tx_burst(stack->port_id, stack->queue_id, &pkts[sent_pkts], nr_pkts - sent_pkts);
} while (sent_pkts < nr_pkts);
return sent_pkts;
diff --git a/src/ltran/ltran_dfx.c b/src/ltran/ltran_dfx.c
index 8db5791..8d71966 100644
--- a/src/ltran/ltran_dfx.c
+++ b/src/ltran/ltran_dfx.c
@@ -561,27 +561,16 @@ static void show_lstack_stats(struct gazelle_stack_dfx_data *lstack_stat)
printf("app_write_drop: %-13"PRIu64" ", lstack_stat->data.pkts.app_write_drop);
printf("write_lwip_drop: %-12"PRIu64" ", lstack_stat->data.pkts.write_lwip_drop);
printf("app_write_idlebuf: %-10"PRIu16" \n", lstack_stat->data.pkts.send_idle_ring_cnt);
+ printf("event_list: %-17"PRIu64" ", lstack_stat->data.pkts.event_list);
printf("recv_list: %-18"PRIu64" ", lstack_stat->data.pkts.recv_list);
- printf("weakup_ring_cnt: %-12"PRIu16" ", lstack_stat->data.pkts.weakup_ring_cnt);
printf("conn_num: %-19"PRIu16" \n", lstack_stat->data.pkts.conn_num);
- printf("weakup_events: %-14"PRIu64" ", lstack_stat->data.pkts.weakup_events);
- printf("lwip_events: %-16"PRIu64" ", lstack_stat->data.pkts.lwip_events);
- printf("app_events: %-17"PRIu64"\n", lstack_stat->data.pkts.app_events);
- printf("epoll_pending: %-14"PRIu64" ", lstack_stat->data.pkts.epoll_pending);
- printf("epoll_self_event: %-11"PRIu64" ", lstack_stat->data.pkts.epoll_self_event);
- printf("remove_event: %-15"PRIu64" \n", lstack_stat->data.pkts.remove_event);
- printf("read_events: %-16"PRIu64" ", lstack_stat->data.pkts.read_events);
- printf("write_events: %-15"PRIu64" ", lstack_stat->data.pkts.write_events);
- printf("accept_events: %-14"PRIu64" \n", lstack_stat->data.pkts.accept_events);
- printf("read_null: %-18"PRIu64" ", lstack_stat->data.pkts.read_null);
- printf("wakeup_list: %-16"PRIu64" ", lstack_stat->data.pkts.wakeup_list);
- printf("event_list: %-17"PRIu64" \n", lstack_stat->data.pkts.event_list);
- printf("send_self_rpc: %-14"PRIu64" ", lstack_stat->data.pkts.send_self_rpc);
- printf("epoll_pending_call: %-9"PRIu64" ", lstack_stat->data.pkts.epoll_pending_call);
- printf("epoll_self_call: %-12"PRIu64" \n", lstack_stat->data.pkts.epoll_self_call);
+ printf("wakeup_events: %-14"PRIu64" ", lstack_stat->data.pkts.wakeup_events);
+ printf("app_events: %-17"PRIu64" ", lstack_stat->data.pkts.app_events);
+ printf("read_null: %-18"PRIu64" \n", lstack_stat->data.pkts.read_null);
printf("call_msg: %-19"PRIu64" ", lstack_stat->data.pkts.call_msg_cnt);
printf("call_alloc_fail: %-12"PRIu64" ", lstack_stat->data.pkts.call_alloc_fail);
printf("call_null: %-18"PRIu64" \n", lstack_stat->data.pkts.call_null);
+ printf("send_self_rpc: %-14"PRIu64" ", lstack_stat->data.pkts.send_self_rpc);
printf("send_list: %-18"PRIu64" \n", lstack_stat->data.pkts.send_list);
}
@@ -884,7 +873,7 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_
printf("Active Internet connections (servers and established)\n");
do {
printf("\n------ stack tid: %6u ------\n", stat->tid);
- printf("No. Proto recv_cnt recv_ring in_send send_ring event self_event Local Address"
+ printf("No. Proto recv_cnt recv_ring in_send send_ring sem_cnt Local Address "
" Foreign Address State\n");
uint32_t unread_pkts = 0;
uint32_t unsend_pkts = 0;
@@ -894,13 +883,13 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_
rip.s_addr = conn_info->rip;
lip.s_addr = conn_info->lip;
if ((conn_info->state == GAZELLE_ACTIVE_LIST) || (conn_info->state == GAZELLE_TIME_WAIT_LIST)) {
- printf("%-6utcp %-10u%-11u%-9u%-11u%-7u%-12u%s:%hu\t%s:%hu\t%s\n", i, conn_info->recv_cnt,
- conn_info->recv_ring_cnt, conn_info->in_send, conn_info->send_ring_cnt, conn_info->event_ring_cnt,
- conn_info->self_ring_cnt, inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port,
+ printf("%-6utcp %-10u%-11u%-9u%-11u%-9d%s:%hu\t%s:%hu\t%s\n", i, conn_info->recv_cnt,
+ conn_info->recv_ring_cnt, conn_info->in_send, conn_info->send_ring_cnt, conn_info->sem_cnt,
+ inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port,
inet_ntop(AF_INET, &rip, str_rip, sizeof(str_rip)), conn_info->r_port,
tcp_state_to_str(conn_info->tcp_sub_state));
} else if (conn_info->state == GAZELLE_LISTEN_LIST) {
- printf("%-6utcp %-60u%s:%hu\t0.0.0.0:*\t\tLISTEN\n", i, conn_info->recv_cnt,
+ printf("%-6utcp %-50u%s:%hu\t0.0.0.0:*\t\tLISTEN\n", i, conn_info->recv_cnt,
inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port);
} else {
printf("Got unknow tcp conn::%s:%5hu, state:%u\n",
diff --git a/src/ltran/ltran_opt.h b/src/ltran/ltran_opt.h
index e4e085d..1117898 100644
--- a/src/ltran/ltran_opt.h
+++ b/src/ltran/ltran_opt.h
@@ -34,12 +34,12 @@
#define GAZELLE_KNI_ETHERNET_HEADER_SIZE 14
#define GAZELLE_KNI_ETHERNET_FCS_SIZE 4
-#define GAZELLE_PKT_MBUF_RX_POOL_NAME_FMT "rx_pool%d"
-#define GAZELLE_PKT_MBUF_TX_POOL_NAME_FMT "tx_pool%d"
+#define GAZELLE_PKT_MBUF_RX_POOL_NAME_FMT "rx_pool%u"
+#define GAZELLE_PKT_MBUF_TX_POOL_NAME_FMT "tx_pool%u"
#define GAZELLE_PKT_MBUF_POOL_NAME_LENGTH 64
#define GAZELLE_BOND_NAME_LENGTH 64
-#define GAZELLE_BOND_DEV_NAME_FMT "net_bonding%d"
+#define GAZELLE_BOND_DEV_NAME_FMT "net_bonding%hu"
#define GAZELLE_BOND_QUEUE_MIN 1
#define GAZELLE_BOND_QUEUE_MAX 64
--
1.8.3.1
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/misaka00251/gazelle.git
git@gitee.com:misaka00251/gazelle.git
misaka00251
gazelle
gazelle
master

搜索帮助