20 Star 0 Fork 32

openEuler-RISC-V/gazelle

forked from src-openEuler/gazelle 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
0148-optimize-app-thread-write-buff-block.patch 36.63 KB
一键复制 编辑 原始数据 按行查看 历史
吴昌盛 提交于 2022-12-03 22:12 . optimize-app-thread-write-buff-block
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001
From 8fcc0033d10f7bac263759794199b4ec96372a92 Mon Sep 17 00:00:00 2001
From: wu-changsheng <wuchangsheng2@huawei.com>
Date: Sat, 3 Dec 2022 21:20:28 +0800
Subject: [PATCH 2/2] optimize app thread write buff block
---
src/common/dpdk_common.h | 39 +--
src/common/gazelle_dfx_msg.h | 6 +-
src/common/gazelle_opt.h | 2 +
src/lstack/api/lstack_epoll.c | 2 +-
src/lstack/core/lstack_dpdk.c | 9 +
src/lstack/core/lstack_lwip.c | 389 +++++++++++++++++++------
src/lstack/core/lstack_stack_stat.c | 2 +-
src/lstack/core/lstack_thread_rpc.c | 13 +
src/lstack/include/lstack_dpdk.h | 10 +-
src/lstack/include/lstack_lwip.h | 7 +-
src/lstack/include/lstack_thread_rpc.h | 2 +
src/lstack/netif/lstack_ethdev.c | 4 +-
src/ltran/ltran_dfx.c | 17 +-
13 files changed, 366 insertions(+), 136 deletions(-)
diff --git a/src/common/dpdk_common.h b/src/common/dpdk_common.h
index 753c168..a0c304c 100644
--- a/src/common/dpdk_common.h
+++ b/src/common/dpdk_common.h
@@ -13,6 +13,7 @@
#ifndef __GAZELLE_DPDK_COMMON_H__
#define __GAZELLE_DPDK_COMMON_H__
+#include <stdbool.h>
#include <rte_mbuf.h>
#include <rte_ring.h>
@@ -24,7 +25,7 @@
#define PTR_TO_PRIVATE(mbuf) RTE_PTR_ADD(mbuf, sizeof(struct rte_mbuf))
/* Layout:
- * | rte_mbuf | pbuf | custom_free_function | payload |
+ * | rte_mbuf | pbuf | custom_free_function | tcp_seg | payload |
**/
struct pbuf;
static inline struct rte_mbuf *pbuf_to_mbuf(struct pbuf *p)
@@ -148,7 +149,6 @@ static __rte_always_inline uint32_t gazelle_ring_sp_enqueue(struct rte_ring *r,
return 0;
}
-
__rte_ring_enqueue_elems(r, head, obj_table, sizeof(void *), n);
__atomic_store_n(&r->cons.head, head + n, __ATOMIC_RELEASE);
@@ -169,39 +169,13 @@ static __rte_always_inline uint32_t gazelle_ring_sc_dequeue(struct rte_ring *r,
return 0;
}
-
__rte_ring_dequeue_elems(r, cons, obj_table, sizeof(void *), n);
- r->cons.tail = cons + n;
-
- return n;
-}
-
-/* get ring obj dont dequeue */
-static __rte_always_inline uint32_t gazelle_ring_sc_peek(struct rte_ring *r, void **obj_table, uint32_t n)
-{
- uint32_t prod = __atomic_load_n(&r->prod.tail, __ATOMIC_ACQUIRE);
- uint32_t cons = r->cons.tail;
-
- uint32_t entries = prod - cons;
- if (n > entries) {
- n = entries;
- }
- if (unlikely(n == 0)) {
- return 0;
- }
-
-
- __rte_ring_dequeue_elems(r, cons, obj_table, sizeof(void *), n);
+ __atomic_store_n(&r->cons.tail, cons + n, __ATOMIC_RELEASE);
return n;
}
-static __rte_always_inline void gazelle_ring_dequeue_over(struct rte_ring *r, uint32_t n)
-{
- r->cons.tail += n;
-}
-
static __rte_always_inline uint32_t gazelle_ring_read(struct rte_ring *r, void **obj_table, uint32_t n)
{
uint32_t cons = __atomic_load_n(&r->cons.head, __ATOMIC_ACQUIRE);
@@ -222,11 +196,6 @@ static __rte_always_inline uint32_t gazelle_ring_read(struct rte_ring *r, void *
return n;
}
-static __rte_always_inline void gazelle_ring_read_n(struct rte_ring *r, uint32_t n)
-{
- __atomic_store_n(&r->prod.tail, r->prod.tail + n, __ATOMIC_RELEASE);
-}
-
static __rte_always_inline void gazelle_ring_read_over(struct rte_ring *r)
{
__atomic_store_n(&r->prod.tail, r->prod.head, __ATOMIC_RELEASE);
@@ -240,7 +209,7 @@ static __rte_always_inline uint32_t gazelle_ring_readover_count(struct rte_ring
static __rte_always_inline uint32_t gazelle_ring_readable_count(const struct rte_ring *r)
{
rte_smp_rmb();
- return r->cons.head - r->prod.tail;
+ return r->cons.head - r->prod.head;
}
static __rte_always_inline uint32_t gazelle_ring_count(const struct rte_ring *r)
diff --git a/src/common/gazelle_dfx_msg.h b/src/common/gazelle_dfx_msg.h
index f6f8d0e..0bdd238 100644
--- a/src/common/gazelle_dfx_msg.h
+++ b/src/common/gazelle_dfx_msg.h
@@ -70,7 +70,7 @@ struct gazelle_stack_stat {
struct gazelle_wakeup_stat {
uint64_t app_events;
- uint64_t app_write_idlefail;
+ uint64_t app_write_rpc;
uint64_t app_write_cnt;
uint64_t app_read_cnt;
uint64_t read_null;
@@ -157,8 +157,10 @@ struct gazelle_stat_lstack_conn_info {
uint32_t send_ring_cnt;
uint32_t recv_ring_cnt;
uint32_t tcp_sub_state;
- int32_t sem_cnt;
+ uint32_t send_back_cnt;
int32_t fd;
+ uint64_t recv_all;
+ uint64_t send_all;
};
struct gazelle_stat_lstack_conn {
diff --git a/src/common/gazelle_opt.h b/src/common/gazelle_opt.h
index 4262e90..aac1ec9 100644
--- a/src/common/gazelle_opt.h
+++ b/src/common/gazelle_opt.h
@@ -47,6 +47,8 @@
#define RTE_TEST_TX_DESC_DEFAULT 2048
#define RTE_TEST_RX_DESC_DEFAULT 4096
+#define MBUF_MAX_DATA_LEN 1460
+
#define DPDK_PKT_BURST_SIZE 512
/* total:33 client, index 32 is invaild client */
diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c
index 35df625..4ea6474 100644
--- a/src/lstack/api/lstack_epoll.c
+++ b/src/lstack/api/lstack_epoll.c
@@ -321,7 +321,7 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
if (CONN_TYPE_HAS_HOST(sock->conn)) {
int32_t ret = posix_api->epoll_ctl_fn(epfd, op, fd, event);
if (ret < 0) {
- LSTACK_LOG(ERR, LSTACK, "fd=%d epfd=%d op=%d\n", fd, epfd, op);
+ LSTACK_LOG(ERR, LSTACK, "fd=%d epfd=%d op=%d errno=%d\n", fd, epfd, op, errno);
}
}
diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c
index 2e8fb45..b9f2793 100644
--- a/src/lstack/core/lstack_dpdk.c
+++ b/src/lstack/core/lstack_dpdk.c
@@ -194,6 +194,15 @@ int32_t pktmbuf_pool_init(struct protocol_stack *stack, uint16_t stack_num)
return 0;
}
+int32_t gazelle_alloc_mbuf_with_reserve(struct rte_mempool *pool, struct rte_mbuf **mbufs, unsigned count)
+{
+ if (rte_mempool_avail_count(pool) < RESERVE_NIC_RECV) {
+ return -1;
+ }
+
+ return rte_pktmbuf_alloc_bulk(pool, mbufs, count);
+}
+
struct rte_ring *create_ring(const char *name, uint32_t count, uint32_t flags, int32_t queue_id)
{
char ring_name[RTE_RING_NAMESIZE] = {0};
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index d30ecdc..2cda2d9 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -34,9 +34,6 @@
#include "dpdk_common.h"
#include "lstack_lwip.h"
-#define HALF_DIVISOR (2)
-#define USED_IDLE_WATERMARK (VDEV_IDLE_QUEUE_SZ >> 2)
-
static void free_ring_pbuf(struct rte_ring *ring)
{
void *pbufs[SOCK_RECV_RING_SIZE];
@@ -55,20 +52,41 @@ static void free_ring_pbuf(struct rte_ring *ring)
} while (gazelle_ring_readover_count(ring));
}
+static void free_list_pbuf(struct pbuf *pbuf)
+{
+ while (pbuf) {
+ struct pbuf *del_pbuf = pbuf;
+ pbuf = pbuf->next;
+
+ del_pbuf->next = NULL;
+ pbuf_free(del_pbuf);
+ }
+}
+
static void reset_sock_data(struct lwip_sock *sock)
{
/* check null pointer in ring_free func */
if (sock->recv_ring) {
free_ring_pbuf(sock->recv_ring);
rte_ring_free(sock->recv_ring);
+ sock->recv_ring = NULL;
}
- sock->recv_ring = NULL;
if (sock->send_ring) {
free_ring_pbuf(sock->send_ring);
rte_ring_free(sock->send_ring);
+ sock->send_ring = NULL;
+ }
+
+ if (sock->send_lastdata) {
+ free_list_pbuf(sock->send_lastdata);
+ sock->send_lastdata = NULL;
+ }
+
+ if (sock->send_pre_del) {
+ pbuf_free(sock->send_pre_del);
+ sock->send_pre_del = NULL;
}
- sock->send_ring = NULL;
sock->stack = NULL;
sock->wakeup = NULL;
@@ -76,6 +94,7 @@ static void reset_sock_data(struct lwip_sock *sock)
sock->epoll_events = 0;
sock->events = 0;
sock->in_send = 0;
+ sock->remain_len = 0;
if (sock->recv_lastdata) {
pbuf_free(sock->recv_lastdata);
@@ -97,6 +116,9 @@ static struct pbuf *init_mbuf_to_pbuf(struct rte_mbuf *mbuf, pbuf_layer layer, u
pbuf->l4_len = 0;
pbuf->header_off = 0;
pbuf->rexmit = 0;
+ pbuf->in_write = 0;
+ pbuf->head = 0;
+ pbuf->last = pbuf;
}
return pbuf;
@@ -110,13 +132,13 @@ static bool replenish_send_idlembuf(struct protocol_stack *stack, struct rte_rin
uint32_t replenish_cnt = gazelle_ring_free_count(ring);
uint32_t alloc_num = LWIP_MIN(replenish_cnt, RING_SIZE(SOCK_SEND_RING_SIZE));
- if (rte_pktmbuf_alloc_bulk(stack->rxtx_pktmbuf_pool, (struct rte_mbuf **)pbuf, alloc_num) != 0) {
+ if (gazelle_alloc_mbuf_with_reserve(stack->rxtx_pktmbuf_pool, (struct rte_mbuf **)pbuf, alloc_num) != 0) {
stack->stats.tx_allocmbuf_fail++;
return true;
}
for (uint32_t i = 0; i < alloc_num; i++) {
- pbuf[i] = init_mbuf_to_pbuf(pbuf[i], PBUF_TRANSPORT, TCP_MSS, PBUF_RAM);
+ pbuf[i] = init_mbuf_to_pbuf(pbuf[i], PBUF_TRANSPORT, MBUF_MAX_DATA_LEN, PBUF_RAM);
}
uint32_t num = gazelle_ring_sp_enqueue(ring, pbuf, alloc_num);
@@ -158,6 +180,7 @@ void gazelle_init_sock(int32_t fd)
init_list_node_null(&sock->recv_list);
init_list_node_null(&sock->event_list);
init_list_node_null(&sock->send_list);
+ pthread_spin_init(&sock->sock_lock, PTHREAD_PROCESS_PRIVATE);
}
void gazelle_clean_sock(int32_t fd)
@@ -179,6 +202,7 @@ void gazelle_clean_sock(int32_t fd)
list_del_node_null(&sock->recv_list);
list_del_node_null(&sock->send_list);
+ pthread_spin_destroy(&sock->sock_lock);
}
void gazelle_free_pbuf(struct pbuf *pbuf)
@@ -196,7 +220,7 @@ int32_t gazelle_alloc_pktmbuf(struct rte_mempool *pool, struct rte_mbuf **mbufs,
{
struct pbuf_custom *pbuf_custom = NULL;
- int32_t ret = rte_pktmbuf_alloc_bulk(pool, mbufs, num);
+ int32_t ret = gazelle_alloc_mbuf_with_reserve(pool, mbufs, num);
if (ret != 0) {
return ret;
}
@@ -214,7 +238,7 @@ struct pbuf *lwip_alloc_pbuf(pbuf_layer layer, uint16_t length, pbuf_type type)
struct rte_mbuf *mbuf;
struct protocol_stack *stack = get_protocol_stack();
- if (rte_pktmbuf_alloc_bulk(stack->rxtx_pktmbuf_pool, &mbuf, 1) != 0) {
+ if (gazelle_alloc_mbuf_with_reserve(stack->rxtx_pktmbuf_pool, &mbuf, 1) != 0) {
stack->stats.tx_allocmbuf_fail++;
return NULL;
}
@@ -226,23 +250,55 @@ struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size, uint8
{
struct pbuf *pbuf = NULL;
- if (gazelle_ring_sc_peek(sock->send_ring, (void **)&pbuf, 1) != 1) {
- *apiflags &= ~TCP_WRITE_FLAG_MORE;
+ if (unlikely(sock->send_pre_del)) {
+ pbuf = sock->send_pre_del;
+ if (pbuf->tot_len > remain_size ||
+ (pbuf->head && __atomic_load_n(&pbuf->in_write, __ATOMIC_ACQUIRE))) {
+ *apiflags &= ~TCP_WRITE_FLAG_MORE;
+ return NULL;
+ }
+
+ if (pbuf->next) {
+ sock->send_lastdata = pbuf->next;
+ pbuf->next = NULL;
+ }
+ return pbuf;
+ }
+
+ if (sock->send_lastdata) {
+ pbuf = sock->send_lastdata;
+ if (pbuf->tot_len > remain_size) {
+ *apiflags &= ~TCP_WRITE_FLAG_MORE;
+ return NULL;
+ }
+ sock->send_pre_del = pbuf;
+ sock->send_lastdata = pbuf->next;
+ pbuf->next = NULL;
+ return pbuf;
+ }
+
+ gazelle_ring_sc_dequeue(sock->send_ring, (void **)&pbuf, 1);
+ if (pbuf == NULL) {
return NULL;
}
+ sock->send_pre_del = pbuf;
- if (pbuf->tot_len > remain_size) {
+ if (pbuf->tot_len > remain_size || __atomic_load_n(&pbuf->in_write, __ATOMIC_ACQUIRE)) {
*apiflags &= ~TCP_WRITE_FLAG_MORE;
+ pbuf->head = 1;
return NULL;
}
+ sock->send_lastdata = pbuf->next;
+ pbuf->next = NULL;
return pbuf;
}
-void write_lwip_over(struct lwip_sock *sock, uint32_t n)
+void write_lwip_over(struct lwip_sock *sock)
{
- gazelle_ring_dequeue_over(sock->send_ring, n);
- sock->stack->stats.write_lwip_cnt += n;
+ sock->send_pre_del = NULL;
+ sock->send_all++;
+ sock->stack->stats.write_lwip_cnt++;
}
static inline void del_data_out_event(struct lwip_sock *sock)
@@ -261,21 +317,174 @@ static inline void del_data_out_event(struct lwip_sock *sock)
pthread_spin_unlock(&sock->wakeup->event_list_lock);
}
-void write_stack_over(struct lwip_sock *sock)
+static ssize_t do_app_write(struct pbuf *pbufs[], void *buf, size_t len, uint32_t write_num)
{
- if (sock->send_lastdata) {
- sock->send_lastdata->tot_len = sock->send_lastdata->len = sock->send_datalen;
- sock->send_lastdata = NULL;
+ ssize_t send_len = 0;
+ uint32_t i = 0;
+
+ for (i = 0; i < write_num - 1; i++) {
+ rte_prefetch0(pbufs[i + 1]);
+ rte_prefetch0(pbufs[i + 1]->payload);
+ rte_prefetch0((char *)buf + send_len + MBUF_MAX_DATA_LEN);
+ pbuf_take(pbufs[i], (char *)buf + send_len, MBUF_MAX_DATA_LEN);
+ pbufs[i]->tot_len = pbufs[i]->len = MBUF_MAX_DATA_LEN;
+ send_len += MBUF_MAX_DATA_LEN;
}
+ /* reduce the branch in loop */
+ uint16_t copy_len = len - send_len;
+ pbuf_take(pbufs[i], (char *)buf + send_len, copy_len);
+ pbufs[i]->tot_len = pbufs[i]->len = copy_len;
+ send_len += copy_len;
+
+ return send_len;
+}
+
+static inline ssize_t app_direct_write(struct protocol_stack *stack, struct lwip_sock *sock, void *buf,
+ size_t len, uint32_t write_num)
+{
+ struct pbuf **pbufs = (struct pbuf **)malloc(write_num * sizeof(struct pbuf *));
+ if (pbufs == NULL) {
+ return 0;
+ }
+
+ /* first pbuf get from send_ring. and malloc pbufs attach to first pbuf */
+ if (gazelle_alloc_mbuf_with_reserve(stack->rxtx_pktmbuf_pool, (struct rte_mbuf **)&pbufs[1], write_num - 1) != 0) {
+ stack->stats.tx_allocmbuf_fail++;
+ free(pbufs);
+ return 0;
+ }
+
+ (void)gazelle_ring_read(sock->send_ring, (void **)&pbufs[0], 1);
+
+ uint32_t i = 1;
+ for (; i < write_num - 1; i++) {
+ rte_prefetch0(mbuf_to_pbuf((void *)pbufs[i + 1]));
+ pbufs[i] = init_mbuf_to_pbuf((struct rte_mbuf *)pbufs[i], PBUF_TRANSPORT, MBUF_MAX_DATA_LEN, PBUF_RAM);
+ pbufs[i - 1]->next = pbufs[i];
+ }
+ if (write_num > 1) {
+ pbufs[i] = init_mbuf_to_pbuf((struct rte_mbuf *)pbufs[i], PBUF_TRANSPORT, MBUF_MAX_DATA_LEN, PBUF_RAM);
+ pbufs[i - 1]->next = pbufs[i];
+ }
+
+ ssize_t send_len = do_app_write(pbufs, buf, len, write_num);
+
gazelle_ring_read_over(sock->send_ring);
- if (sock->wakeup) {
- sock->wakeup->stat.app_write_cnt++;
- if (sock->wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLOUT)) {
- del_data_out_event(sock);
- }
+ pbufs[0]->last = pbufs[write_num - 1];
+ sock->remain_len = 0;
+ free(pbufs);
+ return send_len;
+}
+
+static inline ssize_t app_direct_attach(struct protocol_stack *stack, struct pbuf *attach_pbuf, void *buf,
+ size_t len, uint32_t write_num)
+{
+ struct pbuf **pbufs = (struct pbuf **)malloc(write_num * sizeof(struct pbuf *));
+ if (pbufs == NULL) {
+ return 0;
+ }
+
+ /* first pbuf get from send_ring. and malloc pbufs attach to first pbuf */
+ if (gazelle_alloc_mbuf_with_reserve(stack->rxtx_pktmbuf_pool, (struct rte_mbuf **)pbufs, write_num) != 0) {
+ stack->stats.tx_allocmbuf_fail++;
+ free(pbufs);
+ return 0;
+ }
+
+ pbufs[0] = init_mbuf_to_pbuf((struct rte_mbuf *)pbufs[0], PBUF_TRANSPORT, MBUF_MAX_DATA_LEN, PBUF_RAM);
+ uint32_t i = 1;
+ for (; i < write_num - 1; i++) {
+ rte_prefetch0(mbuf_to_pbuf((void *)pbufs[i + 1]));
+ pbufs[i] = init_mbuf_to_pbuf((struct rte_mbuf *)pbufs[i], PBUF_TRANSPORT, MBUF_MAX_DATA_LEN, PBUF_RAM);
+ pbufs[i - 1]->next = pbufs[i];
}
+ if (write_num > 1) {
+ pbufs[i] = init_mbuf_to_pbuf((struct rte_mbuf *)pbufs[i], PBUF_TRANSPORT, MBUF_MAX_DATA_LEN, PBUF_RAM);
+ pbufs[i - 1]->next = pbufs[i];
+ }
+
+ ssize_t send_len = do_app_write(pbufs, buf, len, write_num);
+
+ attach_pbuf->last->next = pbufs[0];
+ attach_pbuf->last = pbufs[write_num - 1];
+
+ free(pbufs);
+ return send_len;
+}
+
+static inline ssize_t app_buff_write(struct lwip_sock *sock, void *buf, size_t len, uint32_t write_num)
+{
+ struct pbuf *pbufs[SOCK_SEND_RING_SIZE];
+
+ (void)gazelle_ring_read(sock->send_ring, (void **)pbufs, write_num);
+
+ ssize_t send_len = do_app_write(pbufs, buf, len, write_num);
+
+ gazelle_ring_read_over(sock->send_ring);
+
+ sock->remain_len = MBUF_MAX_DATA_LEN - pbufs[write_num - 1]->len;
+ return send_len;
+}
+
+static inline struct pbuf *gazelle_ring_readlast(struct rte_ring *r)
+{
+ struct pbuf *last_pbuf = NULL;
+ volatile uint32_t tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE);
+ uint32_t last = r->prod.tail - 1;
+ if (last == tail || last - tail > r->capacity) {
+ return NULL;
+ }
+
+ __rte_ring_dequeue_elems(r, last, (void **)&last_pbuf, sizeof(void *), 1);
+ __atomic_store_n(&last_pbuf->in_write, 1, __ATOMIC_RELEASE);
+
+ rte_mb();
+
+ tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE);
+ if (last == tail || last - tail > r->capacity) {
+ __atomic_store_n(&last_pbuf->in_write, 0, __ATOMIC_RELEASE);
+ return NULL;
+ }
+
+ return last_pbuf;
+}
+
+static inline void gazelle_ring_lastover(struct pbuf *last_pbuf)
+{
+ __atomic_store_n(&last_pbuf->in_write, 0, __ATOMIC_RELEASE);
+}
+
+static inline size_t merge_data_lastpbuf(struct lwip_sock *sock, void *buf, size_t len)
+{
+ struct pbuf *last_pbuf = gazelle_ring_readlast(sock->send_ring);
+ if (last_pbuf == NULL) {
+ sock->remain_len = 0;
+ return 0;
+ }
+
+ if (last_pbuf->next || last_pbuf->len >= MBUF_MAX_DATA_LEN) {
+ sock->remain_len = 0;
+ gazelle_ring_lastover(last_pbuf);
+ return 0;
+ }
+
+ size_t send_len = MBUF_MAX_DATA_LEN - last_pbuf->len;
+ if (send_len >= len) {
+ sock->remain_len = send_len - len;
+ send_len = len;
+ } else {
+ sock->remain_len = 0;
+ }
+
+ uint16_t offset = last_pbuf->len;
+ last_pbuf->tot_len = last_pbuf->len = offset + send_len;
+ pbuf_take_at(last_pbuf, buf, send_len, offset);
+
+ gazelle_ring_lastover(last_pbuf);
+
+ return send_len;
}
ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len)
@@ -284,44 +493,49 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len)
GAZELLE_RETURN(ENOTCONN);
}
- struct pbuf *pbuf = NULL;
+ struct protocol_stack *stack = sock->stack;
+ struct wakeup_poll *wakeup = sock->wakeup;
+ if (!stack|| len == 0 || !wakeup) {
+ return 0;
+ }
+
ssize_t send_len = 0;
- uint32_t send_pkt = 0;
- while (send_len < len) {
- if (sock->send_lastdata) {
- pbuf = sock->send_lastdata;
- } else {
- if (gazelle_ring_read(sock->send_ring, (void **)&pbuf, 1) != 1) {
- if (sock->wakeup) {
- sock->wakeup->stat.app_write_idlefail++;
- }
- break;
- }
- sock->send_lastdata = pbuf;
- sock->send_datalen = 0;
+ /* merge data into last pbuf */
+ if (sock->remain_len) {
+ send_len = merge_data_lastpbuf(sock, (char *)buf, len);
+ if (send_len >= len) {
+ return len;
}
+ }
- uint16_t remian_len = pbuf->len - sock->send_datalen;
- uint16_t copy_len = (len - send_len > remian_len) ? remian_len : (len - send_len);
- pbuf_take_at(pbuf, (char *)buf + send_len, copy_len, sock->send_datalen);
- sock->send_datalen += copy_len;
- if (sock->send_datalen >= pbuf->len) {
- sock->send_lastdata = NULL;
- pbuf->tot_len = pbuf->len = sock->send_datalen;
- send_pkt++;
- }
+ uint32_t write_num = (len - send_len + MBUF_MAX_DATA_LEN - 1) / MBUF_MAX_DATA_LEN;
+ uint32_t write_avail = gazelle_ring_readable_count(sock->send_ring);
- send_len += copy_len;
+ /* send_ring is full, data attach last pbuf */
+ if (write_avail == 0) {
+ struct pbuf *last_pbuf = gazelle_ring_readlast(sock->send_ring);
+ if (last_pbuf) {
+ send_len += app_direct_attach(stack, last_pbuf, (char *)buf + send_len, len - send_len, write_num);
+ gazelle_ring_lastover(last_pbuf);
+ wakeup->stat.app_write_cnt += write_num;
+ } else {
+ (void)rpc_call_replenish(stack, sock);
+ wakeup->stat.app_write_rpc++;
+ }
+ sock->remain_len = 0;
+ return send_len;
}
- if (sock->wakeup) {
- sock->wakeup->stat.app_write_cnt += send_pkt;
- }
+ /* send_ring have idle */
+ send_len += (write_num <= write_avail) ? app_buff_write(sock, (char *)buf + send_len, len - send_len, write_num) :
+ app_direct_write(stack, sock, (char *)buf + send_len, len - send_len, write_num);
+ wakeup->stat.app_write_cnt += write_num;
- if (send_len == 0) {
- usleep(100);
+ if (wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLOUT)) {
+ del_data_out_event(sock);
}
+
return send_len;
}
@@ -340,6 +554,14 @@ static inline bool replenish_send_ring(struct protocol_stack *stack, struct lwip
return replenish_again;
}
+void rpc_replenish(struct rpc_msg *msg)
+{
+ struct protocol_stack *stack = (struct protocol_stack *)msg->args[MSG_ARG_0].p;
+ struct lwip_sock *sock = (struct lwip_sock *)msg->args[MSG_ARG_1].p;
+
+ msg->result = replenish_send_ring(stack, sock);
+}
+
static inline bool do_lwip_send(struct protocol_stack *stack, int32_t fd, struct lwip_sock *sock, int32_t flags)
{
/* send all send_ring, so len set lwip send max. */
@@ -375,6 +597,7 @@ void stack_send(struct rpc_msg *msg)
list_add_node(&stack->send_list, &sock->send_list);
__atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE);
}
+
stack->stats.send_self_rpc++;
}
}
@@ -389,6 +612,13 @@ void send_stack_list(struct protocol_stack *stack, uint32_t send_max)
list_for_each_safe(node, temp, &stack->send_list) {
sock = container_of(node, struct lwip_sock, send_list);
+ if (++read_num > send_max) {
+ /* list head move to next send */
+ list_del_node(&stack->send_list);
+ list_add_node(&sock->send_list, &stack->send_list);
+ break;
+ }
+
__atomic_store_n(&sock->in_send, 0, __ATOMIC_RELEASE);
rte_mb();
@@ -397,22 +627,6 @@ void send_stack_list(struct protocol_stack *stack, uint32_t send_max)
continue;
}
- if (tcp_sndbuf(sock->conn->pcb.tcp) < TCP_MSS) {
- __atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE);
- continue;
- }
-
- if (!NETCONN_IS_DATAOUT(sock)) {
- replenish_again = replenish_send_ring(stack, sock);
- if (replenish_again) {
- __atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE);
- continue;
- }
-
- list_del_node_null(&sock->send_list);
- continue;
- }
-
replenish_again = do_lwip_send(stack, sock->conn->socket, sock, 0);
if (!NETCONN_IS_DATAOUT(sock) && !replenish_again) {
@@ -420,10 +634,6 @@ void send_stack_list(struct protocol_stack *stack, uint32_t send_max)
} else {
__atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE);
}
-
- if (++read_num >= send_max) {
- break;
- }
}
}
@@ -491,6 +701,7 @@ ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags)
calculate_lstack_latency(&sock->stack->latency, pbufs[i], GAZELLE_LATENCY_LWIP);
}
+ sock->recv_all += read_count;
sock->stack->stats.read_lwip_cnt += read_count;
if (recv_len == 0) {
GAZELLE_RETURN(EAGAIN);
@@ -572,7 +783,6 @@ ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags)
if (send <= 0) {
return send;
}
- write_stack_over(sock);
notice_stack_send(sock, fd, send, flags);
return send;
@@ -608,7 +818,6 @@ ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags)
}
if (buflen > 0) {
- write_stack_over(sock);
notice_stack_send(sock, s, buflen, flags);
}
return buflen;
@@ -724,10 +933,16 @@ void read_recv_list(struct protocol_stack *stack, uint32_t max_num)
struct lwip_sock *sock;
uint32_t read_num = 0;
- struct list_node *last_node = list->prev;
list_for_each_safe(node, temp, list) {
sock = container_of(node, struct lwip_sock, recv_list);
+ if (++read_num >= max_num) {
+ /* list head move to next send */
+ list_del_node(&stack->recv_list);
+ list_add_node(&sock->recv_list, &stack->recv_list);
+ break;
+ }
+
if (sock->conn == NULL || sock->conn->recvmbox == NULL || rte_ring_count(sock->conn->recvmbox->ring) == 0) {
list_del_node_null(&sock->recv_list);
continue;
@@ -741,11 +956,6 @@ void read_recv_list(struct protocol_stack *stack, uint32_t max_num)
} else if (len > 0) {
add_sock_event(sock, EPOLLIN);
}
-
- /* last_node:recv only once per sock. max_num avoid cost too much time this loop */
- if (++read_num >= max_num || last_node == node) {
- break;
- }
}
}
@@ -772,6 +982,19 @@ void gazelle_connected_callback(struct netconn *conn)
add_sock_event(sock, EPOLLOUT);
}
+static uint32_t send_back_count(struct lwip_sock *sock)
+{
+ uint32_t count = 0;
+ struct pbuf *pbuf = sock->send_lastdata;
+
+ while (pbuf) {
+ count++;
+ pbuf = pbuf->next;
+ }
+
+ return count;
+}
+
static void copy_pcb_to_conn(struct gazelle_stat_lstack_conn_info *conn, const struct tcp_pcb *pcb)
{
struct netconn *netconn = (struct netconn *)pcb->callback_arg;
@@ -791,8 +1014,10 @@ static void copy_pcb_to_conn(struct gazelle_stat_lstack_conn_info *conn, const s
if (netconn->socket > 0 && sock != NULL && sock->recv_ring != NULL && sock->send_ring != NULL) {
conn->recv_ring_cnt = gazelle_ring_readable_count(sock->recv_ring);
conn->recv_ring_cnt += (sock->recv_lastdata) ? 1 : 0;
-
+ conn->send_back_cnt = send_back_count(sock);
conn->send_ring_cnt = gazelle_ring_readover_count(sock->send_ring);
+ conn->recv_all = sock->recv_all;
+ conn->send_all = sock->send_all;
}
}
}
diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c
index 6261fa9..45f84a7 100644
--- a/src/lstack/core/lstack_stack_stat.c
+++ b/src/lstack/core/lstack_stack_stat.c
@@ -106,7 +106,7 @@ static void get_wakeup_stat(struct protocol_stack_group *stack_group, struct pro
stat->app_events += wakeup->stat.app_events;
stat->read_null += wakeup->stat.read_null;
stat->app_write_cnt += wakeup->stat.app_write_cnt;
- stat->app_write_idlefail += wakeup->stat.app_write_idlefail;
+ stat->app_write_rpc += wakeup->stat.app_write_rpc;
stat->app_read_cnt += wakeup->stat.app_read_cnt;
}
}
diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c
index 295baf3..29ca4e4 100644
--- a/src/lstack/core/lstack_thread_rpc.c
+++ b/src/lstack/core/lstack_thread_rpc.c
@@ -440,6 +440,19 @@ int32_t rpc_call_ioctl(int fd, long cmd, void *argp)
return rpc_sync_call(&stack->rpc_queue, msg);
}
+int32_t rpc_call_replenish(struct protocol_stack *stack, struct lwip_sock *sock)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(stack, rpc_replenish);
+ if (msg == NULL) {
+ return -1;
+ }
+
+ msg->args[MSG_ARG_0].p = stack;
+ msg->args[MSG_ARG_1].p = sock;
+
+ return rpc_sync_call(&stack->rpc_queue, msg);
+}
+
int32_t rpc_call_send(int fd, const void *buf, size_t len, int flags)
{
struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
diff --git a/src/lstack/include/lstack_dpdk.h b/src/lstack/include/lstack_dpdk.h
index 8d68e06..c042ef5 100644
--- a/src/lstack/include/lstack_dpdk.h
+++ b/src/lstack/include/lstack_dpdk.h
@@ -15,13 +15,15 @@
#include "gazelle_opt.h"
-#define RXTX_NB_MBUF (128 * 2000) /* mbuf per connect * connect num */
+#define RXTX_NB_MBUF (256 * 2000) /* mbuf per connect * connect num. size of mbuf is 2536 Byte */
#define RXTX_CACHE_SZ (VDEV_RX_QUEUE_SZ)
#define KNI_NB_MBUF (DEFAULT_RING_SIZE << 4)
-#define MBUF_HEADER_LEN 64
+#define RESERVE_NIC_RECV (1024)
-#define MAX_PACKET_SZ 2048
+#define MBUF_HEADER_LEN 64
+
+#define MAX_PACKET_SZ 2048
#define RING_SIZE(x) ((x) - 1)
@@ -37,6 +39,7 @@ int thread_affinity_init(int cpu_id);
struct protocol_stack;
struct rte_mempool;
struct rte_ring;
+struct rte_mbuf;
int32_t fill_mbuf_to_ring(struct rte_mempool *mempool, struct rte_ring *ring, uint32_t mbuf_num);
int32_t dpdk_eal_init(void);
int32_t pktmbuf_pool_init(struct protocol_stack *stack, uint16_t stack_num);
@@ -49,5 +52,6 @@ void dpdk_skip_nic_init(void);
int32_t dpdk_init_lstack_kni(void);
void dpdk_restore_pci(void);
bool port_in_stack_queue(uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
+int32_t gazelle_alloc_mbuf_with_reserve(struct rte_mempool *pool, struct rte_mbuf **mbufs, unsigned count);
#endif /* GAZELLE_DPDK_H */
diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h
index 968eff2..b24006a 100644
--- a/src/lstack/include/lstack_lwip.h
+++ b/src/lstack/include/lstack_lwip.h
@@ -15,19 +15,20 @@
#define NETCONN_IS_ACCEPTIN(sock) (((sock)->conn->acceptmbox != NULL) && !sys_mbox_empty((sock)->conn->acceptmbox))
#define NETCONN_IS_DATAIN(sock) ((gazelle_ring_readable_count((sock)->recv_ring) || (sock)->recv_lastdata))
-#define NETCONN_IS_DATAOUT(sock) gazelle_ring_readover_count((sock)->send_ring)
+#define NETCONN_IS_DATAOUT(sock) (gazelle_ring_readover_count((sock)->send_ring) || (sock)->send_lastdata || (sock)->send_pre_del)
#define NETCONN_IS_OUTIDLE(sock) gazelle_ring_readable_count((sock)->send_ring)
struct lwip_sock;
struct rte_mempool;
struct rpc_msg;
struct rte_mbuf;
+struct protocol_stack;
void create_shadow_fd(struct rpc_msg *msg);
void gazelle_init_sock(int32_t fd);
int32_t gazelle_socket(int domain, int type, int protocol);
void gazelle_clean_sock(int32_t fd);
struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size, uint8_t *apiflags);
-void write_lwip_over(struct lwip_sock *sock, uint32_t n);
+void write_lwip_over(struct lwip_sock *sock);
ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len);
ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags);
ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, uint8_t apiflags);
@@ -39,10 +40,12 @@ void get_lwip_conntable(struct rpc_msg *msg);
void get_lwip_connnum(struct rpc_msg *msg);
void stack_recvlist_count(struct rpc_msg *msg);
void stack_send(struct rpc_msg *msg);
+void app_rpc_write(struct rpc_msg *msg);
int32_t gazelle_alloc_pktmbuf(struct rte_mempool *pool, struct rte_mbuf **mbufs, uint32_t num);
void gazelle_free_pbuf(struct pbuf *pbuf);
ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags);
ssize_t recvmsg_from_stack(int32_t s, struct msghdr *message, int32_t flags);
ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags);
+void rpc_replenish(struct rpc_msg *msg);
#endif
diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h
index 6928f98..2c1202e 100644
--- a/src/lstack/include/lstack_thread_rpc.h
+++ b/src/lstack/include/lstack_thread_rpc.h
@@ -51,6 +51,7 @@ struct rpc_msg {
struct protocol_stack;
struct rte_mbuf;
struct wakeup_poll;
+struct lwip_sock;
void poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num);
void rpc_call_clean_epoll(struct protocol_stack *stack, struct wakeup_poll *wakeup);
int32_t rpc_call_msgcnt(struct protocol_stack *stack);
@@ -75,5 +76,6 @@ int32_t rpc_call_getsockopt(int fd, int level, int optname, void *optval, sockle
int32_t rpc_call_setsockopt(int fd, int level, int optname, const void *optval, socklen_t optlen);
int32_t rpc_call_fcntl(int fd, int cmd, long val);
int32_t rpc_call_ioctl(int fd, long cmd, void *argp);
+int32_t rpc_call_replenish(struct protocol_stack *stack, struct lwip_sock *sock);
#endif
diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c
index 87fe9ae..7984ded 100644
--- a/src/lstack/netif/lstack_ethdev.c
+++ b/src/lstack/netif/lstack_ethdev.c
@@ -82,7 +82,7 @@ void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack)
}
}
-#define READ_PKTS_MAX 32
+#define READ_PKTS_MAX 128
int32_t eth_dev_poll(void)
{
uint32_t nr_pkts;
@@ -183,7 +183,7 @@ static err_t eth_dev_output(struct netif *netif, struct pbuf *pbuf)
if (likely(first_mbuf->pkt_len > MBUF_MAX_LEN)) {
mbuf->ol_flags |= RTE_MBUF_F_TX_TCP_SEG;
- mbuf->tso_segsz = TCP_MSS;
+ mbuf->tso_segsz = MBUF_MAX_DATA_LEN;
}
mbuf->l2_len = first_pbuf->l2_len;
mbuf->l3_len = first_pbuf->l3_len;
diff --git a/src/ltran/ltran_dfx.c b/src/ltran/ltran_dfx.c
index c505822..54839af 100644
--- a/src/ltran/ltran_dfx.c
+++ b/src/ltran/ltran_dfx.c
@@ -568,7 +568,7 @@ static void show_lstack_stats(struct gazelle_stack_dfx_data *lstack_stat)
printf("read_lwip_drop: %-13"PRIu64" \n", lstack_stat->data.pkts.stack_stat.read_lwip_drop);
printf("app_write: %-18"PRIu64" ", lstack_stat->data.pkts.wakeup_stat.app_write_cnt);
printf("write_lwip: %-17"PRIu64" ", lstack_stat->data.pkts.stack_stat.write_lwip_cnt);
- printf("app_get_idlefail: %-11"PRIu64" \n", lstack_stat->data.pkts.wakeup_stat.app_write_idlefail);
+ printf("app_write_rpc: %-14"PRIu64" \n", lstack_stat->data.pkts.wakeup_stat.app_write_rpc);
printf("recv_list: %-18"PRIu64" ", lstack_stat->data.pkts.recv_list_cnt);
printf("send_list: %-18"PRIu64" ", lstack_stat->data.pkts.send_list_cnt);
printf("conn_num: %-19hu \n", lstack_stat->data.pkts.conn_num);
@@ -875,8 +875,8 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_
printf("Active Internet connections (servers and established)\n");
do {
printf("\n------ stack tid: %6u ------\n", stat->tid);
- printf("No. Proto recv_cnt recv_ring in_send send_ring sem_cnt fd Local Address "
- " Foreign Address State\n");
+ printf("No. Proto lwip_recv recv_ring recv_all in_send send_ring send_back_cnt send_all "
+ "fd Local Address Foreign Address State\n");
uint32_t unread_pkts = 0;
uint32_t unsend_pkts = 0;
for (i = 0; i < conn->conn_num && i < GAZELLE_LSTACK_MAX_CONN; i++) {
@@ -885,20 +885,21 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_
rip.s_addr = conn_info->rip;
lip.s_addr = conn_info->lip;
if ((conn_info->state == GAZELLE_ACTIVE_LIST) || (conn_info->state == GAZELLE_TIME_WAIT_LIST)) {
- printf("%-6utcp %-10u%-11u%-9u%-11u%-9d%-7d%s:%hu %s:%hu %s\n", i, conn_info->recv_cnt,
- conn_info->recv_ring_cnt, conn_info->in_send, conn_info->send_ring_cnt, conn_info->sem_cnt,
+ printf("%-6utcp %-11u%-11u%-10lu%-9u%-11u%-15d%-10lu%-7d%s:%hu %s:%hu %s\n", i,
+ conn_info->recv_cnt, conn_info->recv_ring_cnt, conn_info->recv_all, conn_info->in_send,
+ conn_info->send_ring_cnt, conn_info->send_back_cnt, conn_info->send_all,
conn_info->fd, inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port,
inet_ntop(AF_INET, &rip, str_rip, sizeof(str_rip)), conn_info->r_port,
tcp_state_to_str(conn_info->tcp_sub_state));
} else if (conn_info->state == GAZELLE_LISTEN_LIST) {
- printf("%-6utcp %-50u%-7d%s:%hu 0.0.0.0:* LISTEN\n", i, conn_info->recv_cnt,
+ printf("%-6utcp %-57u%-7d%s:%hu 0.0.0.0:* LISTEN\n", i, conn_info->recv_cnt,
conn_info->fd, inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port);
} else {
printf("Got unknow tcp conn::%s:%5hu, state:%u\n",
inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port, conn_info->state);
}
- unread_pkts += conn_info->recv_ring_cnt;
- unsend_pkts += conn_info->send_ring_cnt;
+ unread_pkts += conn_info->recv_ring_cnt + conn_info->recv_cnt;
+ unsend_pkts += conn_info->send_ring_cnt + conn_info->in_send + conn_info->send_back_cnt;
}
if (conn->conn_num > 0) {
printf("Total unread pkts:%u unsend pkts:%u\n", unread_pkts, unsend_pkts);
--
2.23.0
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/openeuler-risc-v/gazelle.git
git@gitee.com:openeuler-risc-v/gazelle.git
openeuler-risc-v
gazelle
gazelle
master

搜索帮助