1 Star 0 Fork 32

misaka00251/gazelle

forked from src-openEuler/gazelle 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
0206-add-same-node-ring-for-inter-proces-communication.patch 21.67 KB
一键复制 编辑 原始数据 按行查看 历史
jinag12 提交于 2023-03-18 15:13 . syn add pbuf lock when aggregate pbuf

From a8fbb2cc4f9367e4a83c3611e7a7bdb821504015 Mon Sep 17 00:00:00 2001
From: jiangheng <jiangheng14@huawei.com>
Date: Mon, 13 Mar 2023 16:08:13 +0800
Subject: [PATCH] add same node ring for inter-proces communication
---
src/lstack/api/lstack_wrap.c | 21 +-
src/lstack/core/lstack_lwip.c | 377 +++++++++++++++++++--
src/lstack/core/lstack_protocol_stack.c | 5 +
src/lstack/include/lstack_ethdev.h | 1 +
src/lstack/include/lstack_lwip.h | 3 +-
src/lstack/include/lstack_protocol_stack.h | 1 +
src/lstack/netif/lstack_ethdev.c | 2 +-
7 files changed, 381 insertions(+), 29 deletions(-)
diff --git a/src/lstack/api/lstack_wrap.c b/src/lstack/api/lstack_wrap.c
index 561c6e4..46cbcec 100644
--- a/src/lstack/api/lstack_wrap.c
+++ b/src/lstack/api/lstack_wrap.c
@@ -200,6 +200,16 @@ static int32_t do_bind(int32_t s, const struct sockaddr *name, socklen_t namelen
return rpc_call_bind(s, name, namelen);
}
+bool is_dst_ip_localhost(const struct sockaddr *addr)
+{
+ struct cfg_params *global_params = get_global_cfg_params();
+ struct sockaddr_in *servaddr = (struct sockaddr_in *) addr;
+ if(global_params->host_addr.addr == servaddr->sin_addr.s_addr){
+ return true;
+ }
+ return false;
+}
+
static int32_t do_connect(int32_t s, const struct sockaddr *name, socklen_t namelen)
{
if (name == NULL) {
@@ -224,9 +234,14 @@ static int32_t do_connect(int32_t s, const struct sockaddr *name, socklen_t name
return ret;
}
- ret = posix_api->connect_fn(s, name, namelen);
- if (ret == 0) {
- return ret;
+ char listen_ring_name[RING_NAME_LEN];
+ int remote_port = htons(((struct sockaddr_in *)name)->sin_port);
+ snprintf(listen_ring_name, sizeof(listen_ring_name), "listen_rx_ring_%u", remote_port);
+ if (!is_dst_ip_localhost(name) || rte_ring_lookup(listen_ring_name) == NULL) {
+ ret = posix_api->connect_fn(s, name, namelen);
+ if (ret == 0) {
+ return ret;
+ }
}
return -1;
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index 063eea4..60abfe8 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -19,6 +19,7 @@
#include <lwip/pbuf.h>
#include <lwip/priv/tcp_priv.h>
#include <lwip/posix_api.h>
+#include <lwip/tcp.h>
#include <securec.h>
#include <rte_errno.h>
#include <rte_malloc.h>
@@ -106,7 +107,6 @@ static void reset_sock_data(struct lwip_sock *sock)
static struct pbuf *init_mbuf_to_pbuf(struct rte_mbuf *mbuf, pbuf_layer layer, uint16_t length, pbuf_type type)
{
struct pbuf_custom *pbuf_custom = mbuf_to_pbuf(mbuf);
- pbuf_custom->custom_free_function = gazelle_free_pbuf;
void *data = rte_pktmbuf_mtod(mbuf, void *);
struct pbuf *pbuf = pbuf_alloced_custom(layer, length, type, pbuf_custom, data, MAX_PACKET_SZ);
@@ -229,18 +229,11 @@ void gazelle_free_pbuf(struct pbuf *pbuf)
int32_t gazelle_alloc_pktmbuf(struct rte_mempool *pool, struct rte_mbuf **mbufs, uint32_t num)
{
- struct pbuf_custom *pbuf_custom = NULL;
-
int32_t ret = rte_pktmbuf_alloc_bulk(pool, mbufs, num);
if (ret != 0) {
return ret;
}
- for (uint32_t i = 0; i < num; i++) {
- pbuf_custom = mbuf_to_pbuf(mbufs[i]);
- pbuf_custom->custom_free_function = gazelle_free_pbuf;
- }
-
return 0;
}
@@ -802,6 +795,93 @@ static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t
}
}
+static inline void del_data_in_event(struct lwip_sock *sock)
+{
+ pthread_spin_lock(&sock->wakeup->event_list_lock);
+
+ /* check again avoid cover event add in stack thread */
+ if (!NETCONN_IS_DATAIN(sock)) {
+ sock->events &= ~EPOLLIN;
+
+ if (sock->events == 0) {
+ list_del_node_null(&sock->event_list);
+ }
+ }
+
+ pthread_spin_unlock(&sock->wakeup->event_list_lock);
+}
+
+/* process on same node use ring to recv data */
+ssize_t gazelle_same_node_ring_recv(struct lwip_sock *sock, const void *buf, size_t len, int32_t flags)
+{
+ unsigned long long cur_begin = sock->same_node_rx_ring->sndbegin;
+ unsigned long long cur_end;
+ unsigned long long index = cur_begin + 1;
+ size_t act_len = 0;
+
+ cur_end = __atomic_load_n(&sock->same_node_rx_ring->sndend, __ATOMIC_ACQUIRE);
+ if (cur_begin == cur_end) {
+ errno = EAGAIN;
+ act_len = -1;
+ goto END;
+ }
+
+ act_len = cur_end - index + 1;
+ act_len = RTE_MIN(act_len, len);
+
+ if ((index & SAME_NODE_RING_MASK) + act_len > SAME_NODE_RING_LEN) {
+ size_t act_len1 = SAME_NODE_RING_LEN - (index & SAME_NODE_RING_MASK);
+ size_t act_len2 = act_len - act_len1;
+ rte_memcpy((char *)buf, (char *)sock->same_node_rx_ring->mz->addr + (index & SAME_NODE_RING_MASK), act_len1);
+ rte_memcpy((char *)buf + act_len1, (char *)sock->same_node_rx_ring->mz->addr, act_len2);
+ } else {
+ rte_memcpy((char *)buf, (char *)sock->same_node_rx_ring->mz->addr + (index & SAME_NODE_RING_MASK), act_len);
+ }
+
+ index += act_len;
+ __atomic_store_n(&sock->same_node_rx_ring->sndbegin, index - 1, __ATOMIC_RELEASE);
+
+END:
+ /* rte_ring_count reduce lock */
+ if (sock->wakeup && sock->wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLIN)) {
+ del_data_in_event(sock);
+ }
+ return act_len;
+}
+
+/* processes on same node use ring to send data */
+ssize_t gazelle_same_node_ring_send(struct lwip_sock *sock, const void *buf, size_t len, int32_t flags)
+{
+ unsigned long long cur_begin = __atomic_load_n(&sock->same_node_tx_ring->sndbegin, __ATOMIC_ACQUIRE);
+ unsigned long long cur_end = sock->same_node_tx_ring->sndend;
+ if (cur_end >= cur_begin + SAME_NODE_RING_LEN) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ unsigned long long index = cur_end + 1;
+ size_t act_len = SAME_NODE_RING_LEN - (cur_end - cur_begin);
+ act_len = RTE_MIN(act_len, len);
+
+ if ((index & SAME_NODE_RING_MASK) + act_len > SAME_NODE_RING_LEN) {
+ size_t act_len1 = SAME_NODE_RING_LEN - (index & SAME_NODE_RING_MASK);
+ size_t act_len2 = act_len - act_len1;
+ rte_memcpy((char *)sock->same_node_tx_ring->mz->addr + (index & SAME_NODE_RING_MASK), buf, act_len1);
+ rte_memcpy((char *)sock->same_node_tx_ring->mz->addr, (char *)buf + act_len1, act_len2);
+ } else {
+ rte_memcpy((char *)sock->same_node_tx_ring->mz->addr + (index & SAME_NODE_RING_MASK), buf, act_len);
+ }
+
+ index += act_len;
+ __atomic_store_n(&sock->same_node_tx_ring->sndend, index - 1, __ATOMIC_RELEASE);
+ if (act_len == 0) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ return act_len;
+}
+
ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags)
{
if (buf == NULL) {
@@ -813,6 +893,9 @@ ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags)
}
struct lwip_sock *sock = get_socket_by_fd(fd);
+ if (sock->same_node_tx_ring != NULL) {
+ return gazelle_same_node_ring_send(sock, buf, len, flags);
+ }
ssize_t send = write_stack_data(sock, buf, len);
if (send <= 0) {
return send;
@@ -857,22 +940,6 @@ ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags)
return buflen;
}
-static inline void del_data_in_event(struct lwip_sock *sock)
-{
- pthread_spin_lock(&sock->wakeup->event_list_lock);
-
- /* check again avoid cover event add in stack thread */
- if (!NETCONN_IS_DATAIN(sock)) {
- sock->events &= ~EPOLLIN;
-
- if (sock->events == 0) {
- list_del_node_null(&sock->event_list);
- }
- }
-
- pthread_spin_unlock(&sock->wakeup->event_list_lock);
-}
-
static struct pbuf *pbuf_free_partial(struct pbuf *pbuf, uint16_t free_len)
{
uint32_t tot_len = pbuf->tot_len - free_len;
@@ -906,6 +973,10 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags)
return 0;
}
+ if (sock->same_node_rx_ring != NULL) {
+ return gazelle_same_node_ring_recv(sock, buf, len, flags);
+ }
+
while (recv_left > 0) {
if (sock->recv_lastdata) {
pbuf = sock->recv_lastdata;
@@ -962,6 +1033,21 @@ void add_recv_list(int32_t fd)
}
}
+void read_same_node_recv_list(struct protocol_stack *stack)
+{
+ struct list_node *list = &(stack->same_node_recv_list);
+ struct list_node *node, *temp;
+ struct lwip_sock *sock;
+
+ list_for_each_safe(node, temp, list) {
+ sock = container_of(node, struct lwip_sock, recv_list);
+
+ if (sock->same_node_rx_ring != NULL && same_node_ring_count(sock)) {
+ add_sock_event(sock, EPOLLIN);
+ }
+ }
+}
+
void read_recv_list(struct protocol_stack *stack, uint32_t max_num)
{
struct list_node *list = &(stack->recv_list);
@@ -1216,3 +1302,246 @@ void stack_recvlist_count(struct rpc_msg *msg)
msg->result = get_list_count(&stack->recv_list);
}
+
+void netif_poll(struct netif *netif)
+{
+ struct tcp_pcb *pcb = NULL;
+ struct tcp_pcb_listen *pcbl = NULL;
+
+ for (pcb = tcp_active_pcbs; pcb != NULL; pcb = pcb->next) {
+#define NETIF_POLL_READ_COUNT 32
+ struct pbuf *pbufs[NETIF_POLL_READ_COUNT];
+ int ret;
+
+ if (pcb->client_rx_ring != NULL) {
+ ret = rte_ring_sc_dequeue_burst(pcb->client_rx_ring, (void **)pbufs, NETIF_POLL_READ_COUNT, NULL);
+ for (int i = 0; i < ret; i++) {
+ if (ip_input(pbufs[i], netif) != 0) {
+ LSTACK_LOG(INFO, LSTACK, "netif_poll: ip_input return err\n");
+ pbuf_free(pbufs[i]);
+ }
+ }
+ }
+ }
+ for (pcbl = tcp_listen_pcbs.listen_pcbs; pcbl != NULL; pcbl = pcbl->next) {
+ if (pcbl->listen_rx_ring != NULL) {
+ struct pbuf *pbuf;
+ if (rte_ring_sc_dequeue(pcbl->listen_rx_ring, (void **)&pbuf) == 0) {
+ if (ip_input(pbuf, netif) != ERR_OK) {
+ pbuf_free(pbuf);
+ }
+ }
+ }
+ }
+}
+
+/* processes on same node handshake packet use this function */
+err_t netif_loop_output(struct netif *netif, struct pbuf *p)
+{
+ struct tcp_pcb *pcb = p->pcb;
+ struct pbuf *head = NULL;
+
+ if (pcb == NULL || pcb->client_tx_ring == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "netif_loop_output: pcb is null\n");
+ return ERR_ARG;
+ }
+
+ if (p->next != NULL) {
+ LSTACK_LOG(ERR, LSTACK, "netif_loop_output: not support chained pbuf\n");
+ return ERR_ARG;
+ }
+
+ struct tcp_hdr *tcp_hdr = (struct tcp_hdr *)((char *)p->payload + sizeof(struct ip_hdr));
+ uint8_t flags = TCPH_FLAGS(tcp_hdr);
+
+ head = pbuf_alloc(0, p->len, PBUF_RAM);
+ if (head == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "netif_loop_output: pbuf_alloc failed\n");
+ return ERR_MEM;
+ }
+ head->ol_flags = p->ol_flags;
+ memcpy(head->payload, p->payload, p->len);
+
+ if ((flags & TCP_SYN) && !(flags & TCP_ACK)) {
+ /* SYN packet, send to listen_ring */
+ char ring_name[RING_NAME_LEN] = {0};
+ snprintf(ring_name, sizeof(ring_name), "listen_rx_ring_%d", pcb->remote_port);
+ struct rte_ring *ring = rte_ring_lookup(ring_name);
+ if (ring == NULL) {
+ LSTACK_LOG(INFO, LSTACK, "netif_loop_output: cant find listen_rx_ring %d\n", pcb->remote_port);
+ pbuf_free(head);
+ } else {
+ if (rte_ring_mp_enqueue(ring, head) != 0) {
+ LSTACK_LOG(INFO, LSTACK, "enqueue sync packet failed\n");
+ pbuf_free(head);
+ }
+ }
+ } else {
+ /* send other type packet to tx_ring */
+ if (rte_ring_sp_enqueue(pcb->client_tx_ring, head) != 0) {
+ LSTACK_LOG(INFO, LSTACK, "client tx ring full\n");
+ pbuf_free(head);
+ }
+ }
+
+ return ERR_OK;
+}
+
+err_t find_same_node_memzone(struct tcp_pcb *pcb, struct lwip_sock *nsock)
+{
+ char name[RING_NAME_LEN];
+ snprintf(name, sizeof(name), "rte_mz_rx_%u", pcb->remote_port);
+ if ((nsock->same_node_tx_ring_mz = rte_memzone_lookup(name)) == NULL) {
+ LSTACK_LOG(INFO, LSTACK, "lwip_accept: can't find %s\n",name);
+ return -1;
+ } else {
+ LSTACK_LOG(INFO, LSTACK, "lookup %s success\n", name);
+ }
+ nsock->same_node_tx_ring = (struct same_node_ring *)nsock->same_node_tx_ring_mz->addr;
+
+ snprintf(name, sizeof(name), "rte_mz_buf_rx_%u", pcb->remote_port);
+ if ((nsock->same_node_tx_ring->mz = rte_memzone_lookup(name)) == NULL) {
+ LSTACK_LOG(INFO, LSTACK, "lwip_accept: can't find %s\n",name);
+ return -1;
+ }
+
+ snprintf(name, sizeof(name), "rte_mz_tx_%u", pcb->remote_port);
+ if ((nsock->same_node_rx_ring_mz = rte_memzone_lookup(name)) == NULL) {
+ LSTACK_LOG(INFO, LSTACK, "lwip_accept: can't find %s\n",name);
+ return -1;
+ } else {
+ LSTACK_LOG(INFO, LSTACK, "lookup %s success\n", name);
+ }
+ nsock->same_node_rx_ring = (struct same_node_ring *)nsock->same_node_rx_ring_mz->addr;
+
+ snprintf(name, sizeof(name), "rte_mz_buf_tx_%u", pcb->remote_port);
+ if ((nsock->same_node_rx_ring->mz = rte_memzone_lookup(name)) == NULL) {
+ LSTACK_LOG(INFO, LSTACK, "lwip_accept: can't find %s\n",name);
+ return -1;
+ }
+
+ /* rcvlink init in alloc_socket() */
+ /* remove from g_rcv_process_list in free_socket */
+ list_add_node(&nsock->stack->same_node_recv_list, &nsock->recv_list);
+ return 0;
+}
+
+err_t same_node_memzone_create(const struct rte_memzone **zone, int size, int port, char *name, char *rx)
+{
+ char mem_name[RING_NAME_LEN] = {0};
+ snprintf(mem_name, sizeof(mem_name), "%s_%s_%u", name, rx, port);
+
+ *zone = rte_memzone_reserve_aligned(mem_name, size, rte_socket_id(), 0, RTE_CACHE_LINE_SIZE);
+ if (*zone == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "cannot reserve memzone:%s, errno is %d\n", mem_name, rte_errno);
+ return ERR_MEM;
+ }
+
+ LSTACK_LOG(INFO, LSTACK, "lstack id %d, reserve %s(%p) success, addr is %p, size is %u\n", rte_socket_id(), mem_name, *zone, (*zone)->addr, size);
+
+ return ERR_OK;
+}
+
+err_t same_node_ring_create(struct rte_ring **ring, int size, int port, char *name, char *rx)
+{
+ unsigned flags;
+ char ring_name[RING_NAME_LEN] = {0};
+ if (strcmp(name, "listen") == 0) {
+ flags = RING_F_SC_DEQ;
+ } else {
+ flags = RING_F_SP_ENQ | RING_F_SC_DEQ;
+ }
+
+ snprintf(ring_name, sizeof(ring_name), "%s_%s_ring_%u", name, rx, port);
+ *ring = rte_ring_create(ring_name, size, rte_socket_id(), flags);
+ if (*ring == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "cannot create rte_ring %s, errno is %d\n", ring_name, rte_errno);
+ return ERR_MEM;
+ }
+ LSTACK_LOG(INFO, LSTACK, "lstack socket id:%d, create %s(%p) success\n", rte_socket_id(), ring_name, *ring);
+ return ERR_OK;
+}
+
+static void init_same_node_ring(struct tcp_pcb *pcb)
+{
+ struct netconn *netconn = (struct netconn *)pcb->callback_arg;
+ struct lwip_sock *sock = get_socket(netconn->socket);
+
+ pcb->client_rx_ring = NULL;
+ pcb->client_tx_ring = NULL;
+ pcb->free_ring = 0;
+ sock->same_node_rx_ring = NULL;
+ sock->same_node_rx_ring_mz = NULL;
+ sock->same_node_tx_ring = NULL;
+ sock->same_node_tx_ring_mz = NULL;
+}
+
+#define CLIENT_RING_SIZE 512
+err_t create_same_node_ring(struct tcp_pcb *pcb)
+{
+ struct netconn *netconn = (struct netconn *)pcb->callback_arg;
+ struct lwip_sock *sock = get_socket(netconn->socket);
+
+ if (same_node_ring_create(&pcb->client_rx_ring, CLIENT_RING_SIZE, pcb->local_port, "client", "rx") != 0) {
+ goto END;
+ }
+ if (same_node_ring_create(&pcb->client_tx_ring, CLIENT_RING_SIZE, pcb->local_port, "client", "tx") != 0) {
+ goto END;
+ }
+ pcb->free_ring = 1;
+
+ if (same_node_memzone_create(&sock->same_node_rx_ring_mz, sizeof(struct same_node_ring), pcb->local_port, "rte_mz", "rx") != 0) {
+ goto END;
+ }
+ sock->same_node_rx_ring = (struct same_node_ring*)sock->same_node_rx_ring_mz->addr;
+
+ if (same_node_memzone_create(&sock->same_node_rx_ring->mz, SAME_NODE_RING_LEN, pcb->local_port, "rte_mz_buf", "rx") != 0) {
+ goto END;
+ }
+
+ sock->same_node_rx_ring->sndbegin = 0;
+ sock->same_node_rx_ring->sndend = 0;
+
+ if (same_node_memzone_create(&sock->same_node_tx_ring_mz, sizeof(struct same_node_ring), pcb->local_port, "rte_mz", "tx") != 0) {
+ goto END;
+ }
+ sock->same_node_tx_ring = (struct same_node_ring*)sock->same_node_tx_ring_mz->addr;
+
+ if (same_node_memzone_create(&sock->same_node_tx_ring->mz, SAME_NODE_RING_LEN, pcb->local_port, "rte_mz_buf", "tx") != 0) {
+ goto END;
+ }
+
+ sock->same_node_tx_ring->sndbegin = 0;
+ sock->same_node_tx_ring->sndend = 0;
+
+ return 0;
+END:
+ rte_ring_free(pcb->client_rx_ring);
+ rte_ring_free(pcb->client_tx_ring);
+ rte_memzone_free(sock->same_node_rx_ring->mz);
+ rte_memzone_free(sock->same_node_rx_ring_mz);
+ rte_memzone_free(sock->same_node_tx_ring->mz);
+ rte_memzone_free(sock->same_node_tx_ring_mz);
+ init_same_node_ring(pcb);
+ return ERR_BUF;
+}
+
+err_t find_same_node_ring(struct tcp_pcb *npcb)
+{
+ char name[RING_NAME_LEN] = {0};
+ snprintf(name, sizeof(name), "client_tx_ring_%u", npcb->remote_port);
+ npcb->client_rx_ring = rte_ring_lookup(name);
+ memset(name, 0, sizeof(name));
+ snprintf(name, sizeof(name), "client_rx_ring_%u", npcb->remote_port);
+ npcb->client_tx_ring = rte_ring_lookup(name);
+ npcb->free_ring = 0;
+ if (npcb->client_tx_ring == NULL ||
+ npcb->client_rx_ring == NULL) {
+ LSTACK_LOG(INFO, LSTACK, "lookup client rxtx ring failed, port is %d\n", npcb->remote_port);
+ tcp_abandon(npcb, 0);
+ return ERR_CONN;
+ } else {
+ LSTACK_LOG(INFO, LSTACK, "find client_tx_ring_%u and client_rx_ring_%u\n", npcb->remote_port, npcb->remote_port);
+ }
+ return 0;
+}
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index 48eff1d..0d7b7f0 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -314,6 +314,7 @@ static int32_t init_stack_value(struct protocol_stack *stack, void *arg)
stack->lwip_stats = &lwip_stats;
init_list_node(&stack->recv_list);
+ init_list_node(&stack->same_node_recv_list);
init_list_node(&stack->wakeup_list);
sys_calibrate_tsc();
@@ -497,6 +498,10 @@ static void* gazelle_stack_thread(void *arg)
gazelle_eth_dev_poll(stack, use_ltran_flag, nic_read_number);
+ /* reduce traversal times */
+ if ((wakeup_tick & 0xff) == 0) {
+ read_same_node_recv_list(stack);
+ }
read_recv_list(stack, read_connect_number);
if ((wakeup_tick & 0xf) == 0) {
diff --git a/src/lstack/include/lstack_ethdev.h b/src/lstack/include/lstack_ethdev.h
index a690adb..55cf769 100644
--- a/src/lstack/include/lstack_ethdev.h
+++ b/src/lstack/include/lstack_ethdev.h
@@ -45,5 +45,6 @@ void delete_user_process_port(uint16_t dst_port, enum port_type type);
void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type);
void delete_flow_director(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
void config_flow_director(uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
+void netif_poll(struct netif *netif);
#endif /* __GAZELLE_ETHDEV_H__ */
diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h
index 02110e0..d52a06d 100644
--- a/src/lstack/include/lstack_lwip.h
+++ b/src/lstack/include/lstack_lwip.h
@@ -14,7 +14,7 @@
#define __GAZELLE_LWIP_H__
#define NETCONN_IS_ACCEPTIN(sock) (((sock)->conn->acceptmbox != NULL) && !sys_mbox_empty((sock)->conn->acceptmbox))
-#define NETCONN_IS_DATAIN(sock) ((gazelle_ring_readable_count((sock)->recv_ring) || (sock)->recv_lastdata))
+#define NETCONN_IS_DATAIN(sock) ((gazelle_ring_readable_count((sock)->recv_ring) || (sock)->recv_lastdata) || (sock->same_node_rx_ring != NULL && same_node_ring_count(sock)))
#define NETCONN_IS_DATAOUT(sock) (gazelle_ring_readover_count((sock)->send_ring) || (sock)->send_lastdata || (sock)->send_pre_del)
#define NETCONN_IS_OUTIDLE(sock) gazelle_ring_readable_count((sock)->send_ring)
@@ -33,6 +33,7 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len);
ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags);
ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, uint8_t apiflags);
void read_recv_list(struct protocol_stack *stack, uint32_t max_num);
+void read_same_node_recv_list(struct protocol_stack *stack);
void send_stack_list(struct protocol_stack *stack, uint32_t send_max);
void add_recv_list(int32_t fd);
void get_lwip_conntable(struct rpc_msg *msg);
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index d58c98a..3691250 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -71,6 +71,7 @@ struct protocol_stack {
struct rte_mbuf *pkts[RTE_TEST_RX_DESC_DEFAULT];
struct list_node recv_list;
+ struct list_node same_node_recv_list; /* used for same node processes communication */
struct list_node wakeup_list;
volatile uint16_t conn_num;
diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c
index 60ea897..01b1280 100644
--- a/src/lstack/netif/lstack_ethdev.c
+++ b/src/lstack/netif/lstack_ethdev.c
@@ -82,7 +82,6 @@ void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack)
len = (uint16_t)rte_pktmbuf_data_len(m);
payload = rte_pktmbuf_mtod(m, void *);
pc = mbuf_to_pbuf(m);
- pc->custom_free_function = gazelle_free_pbuf;
next = pbuf_alloced_custom(PBUF_RAW, (uint16_t)len, PBUF_RAM, pc, payload, (uint16_t)len);
if (next == NULL) {
stack->stats.rx_allocmbuf_fail++;
@@ -653,6 +652,7 @@ int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, uint8_t use_ltran_fla
{
uint32_t nr_pkts;
+ netif_poll(&stack->netif);
nr_pkts = stack->dev_ops.rx_poll(stack, stack->pkts, nic_read_number);
if (nr_pkts == 0) {
return 0;
--
2.23.0
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/misaka00251/gazelle.git
git@gitee.com:misaka00251/gazelle.git
misaka00251
gazelle
gazelle
master

搜索帮助