代码拉取完成,页面将自动刷新
同步操作将从 src-openEuler/gazelle 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
From 472e2f00b3fda7dad4396704fd94715d91be4642 Mon Sep 17 00:00:00 2001
From: jiangheng <jiangheng14@huawei.com>
Date: Wed, 21 Feb 2024 04:25:43 +0800
Subject: [PATCH] diff udp and tcp read from stack
---
src/lstack/core/lstack_lwip.c | 211 +++++++++++++++++++++++-----------
1 file changed, 146 insertions(+), 65 deletions(-)
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index 50a3389..0b339fe 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -831,74 +831,96 @@ static struct pbuf *pbuf_free_partial(struct pbuf *pbuf, uint16_t free_len)
return pbuf;
}
-ssize_t do_lwip_read_from_stack(int32_t fd, void *buf, size_t len, int32_t flags,
- struct sockaddr *addr, socklen_t *addrlen)
+static bool recv_break_for_err(struct lwip_sock *sock)
{
- size_t recv_left = len;
- struct pbuf *pbuf = NULL;
- ssize_t recvd = 0;
- uint32_t copy_len;
- struct lwip_sock *sock = get_socket_by_fd(fd);
- bool latency_enable = get_protocol_stack_group()->latency_start;
-
- if (sock->errevent > 0 && !NETCONN_IS_DATAIN(sock)) {
- errno = err_to_errno(netconn_err(sock->conn));
- return -1;
- }
+ bool break_wait = (sock->errevent > 0) && (!NETCONN_IS_DATAIN(sock));
+ errno = err_to_errno(netconn_err(sock->conn));
+ return break_wait;
+}
- thread_bind_stack(sock);
+static void recv_block_wait(struct lwip_sock *sock)
+{
+ lstack_block_wait(sock->wakeup);
+}
- if (sock->same_node_rx_ring != NULL) {
- return gazelle_same_node_ring_recv(sock, buf, len, flags);
+/*
+ * return 0 on success, -1 on error
+ * pbuf maybe NULL(tcp fin packet)
+ */
+static int recv_ring_get_one(struct lwip_sock *sock, bool noblock, struct pbuf **pbuf)
+{
+ if (sock->recv_lastdata != NULL) {
+ *pbuf = sock->recv_lastdata;
+ sock->recv_lastdata = NULL;
+ return 0;
}
- while (recv_left > 0) {
- if (sock->recv_lastdata) {
- pbuf = sock->recv_lastdata;
- sock->recv_lastdata = NULL;
+ if (noblock) {
+ if (gazelle_ring_read(sock->recv_ring, (void **)pbuf, 1) != 1) {
+ errno = EAGAIN;
+ return -1;
} else {
- if (netconn_is_nonblocking(sock->conn)) {
- if (gazelle_ring_read(sock->recv_ring, (void **)&pbuf, 1) != 1) {
- break;
- }
- } else {
- while (gazelle_ring_read(sock->recv_ring, (void **)&pbuf, 1) != 1 && recvd == 0) {
- /* if the connection is disconnected, recv return 0 */
- if (sock->errevent > 0 && !NETCONN_IS_DATAIN(sock)) {
- errno = err_to_errno(netconn_err(sock->conn));
- return -1;
- }
-
- lstack_block_wait(sock->wakeup);
- }
+ return 0;
+ }
+ } else {
+ while (gazelle_ring_read(sock->recv_ring, (void **)pbuf, 1) != 1) {
+ if (recv_break_for_err(sock)) {
+ return -1;
}
+ recv_block_wait(sock);
}
+ return 0;
+ }
+}
- /* if udp recv a packet whose len is 0, return 0 */
- if (NETCONN_IS_UDP(sock) && pbuf->tot_len == 0) {
- return 0;
+/* return true: fin is read to user, false: pend fin */
+static bool recv_ring_handle_fin(struct lwip_sock *sock, struct pbuf *pbuf, ssize_t recvd)
+{
+ if (pbuf == NULL) {
+ if (recvd > 0) {
+ /* handle data first, then handle fin */
+ sock->recv_lastdata = (void *)&fin_packet;
+ gazelle_ring_read_over(sock->recv_ring);
+ return false;
}
+ gazelle_ring_read_over(sock->recv_ring);
+ return true;
+ }
+ /* pending fin */
+ if (pbuf == (void *)&fin_packet) {
+ return true;
+ }
- /* fin */
- if (unlikely(pbuf == NULL)) {
- if (recvd > 0) {
- /* read data first, then read fin */
- sock->recv_lastdata = (void *)&fin_packet;
- gazelle_ring_read_over(sock->recv_ring);
- break;
- }
- gazelle_ring_read_over(sock->recv_ring);
- return 0;
+ return false;
+}
+
+static ssize_t recv_ring_tcp_read(struct lwip_sock *sock, void *buf, size_t len, bool noblock)
+{
+ ssize_t recvd = 0;
+ size_t recv_left = len;
+ uint32_t copy_len;
+ struct pbuf *pbuf = NULL;
+
+ if (len == 0) {
+ return 0;
+ }
+
+ while (recv_left > 0) {
+ if (recv_ring_get_one(sock, noblock, &pbuf) != 0) {
+ break;
}
- /* pending fin */
- if (unlikely(pbuf == (void *)&fin_packet)) {
- return 0;
+ if (unlikely((pbuf == NULL) || (pbuf == (void *)&fin_packet))) {
+ if (recv_ring_handle_fin(sock, pbuf, recvd)) {
+ return 0;
+ } else {
+ break; /* recvd > 0, pending fin, handle data */
+ }
}
copy_len = (recv_left > pbuf->tot_len) ? pbuf->tot_len : recv_left;
if (copy_len > UINT16_MAX) {
- copy_len = UINT16_MAX;
+ copy_len = UINT16_MAX; /* it's impossible to get here */
}
pbuf_copy_partial(pbuf, (char *)buf + recvd, copy_len, 0);
@@ -907,39 +929,98 @@ ssize_t do_lwip_read_from_stack(int32_t fd, void *buf, size_t len, int32_t flags
if (pbuf->tot_len > copy_len) {
sock->recv_lastdata = pbuf_free_partial(pbuf, copy_len);
- break;
} else {
if (sock->wakeup) {
sock->wakeup->stat.app_read_cnt += 1;
}
- if (latency_enable) {
+
+ if (get_protocol_stack_group()->latency_start) {
calculate_lstack_latency(&sock->stack->latency, pbuf, GAZELLE_LATENCY_READ_LSTACK);
}
- gazelle_ring_read_over(sock->recv_ring);
- /* in udp, if pbuf remaining len less than copy_len, discard these packets */
- if (recvd > 0 && NETCONN_IS_UDP(sock)) {
- sock->stack->stats.sock_rx_drop++;
- break;
- }
+ gazelle_ring_read_over(sock->recv_ring);
}
}
- /* rte_ring_count reduce lock */
- if (sock->wakeup && sock->wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLIN)
- && (!NETCONN_IS_DATAIN(sock))) {
- del_sock_event(sock, EPOLLIN);
+ if (recvd > 0) {
+ errno = 0;
+ } else {
+ recvd = -1;
}
+ return recvd;
+}
+
+static ssize_t recv_ring_udp_read(struct lwip_sock *sock, void *buf, size_t len, bool noblock,
+ struct sockaddr *addr, socklen_t *addrlen)
+{
+ size_t recv_left = len;
+ struct pbuf *pbuf = NULL;
+ uint32_t copy_len;
+
+ sock->recv_lastdata = NULL;
+
+ if (recv_ring_get_one(sock, noblock, &pbuf) != 0) {
+ /* errno have set */
+ return -1;
+ }
+
+ copy_len = (recv_left > pbuf->tot_len) ? pbuf->tot_len : recv_left;
+ pbuf_copy_partial(pbuf, (char *)buf, copy_len, 0);
+ /* drop remaining data if have */
+ gazelle_ring_read_over(sock->recv_ring);
+
if (pbuf && addr && addrlen) {
lwip_sock_make_addr(sock->conn, &(pbuf->addr), pbuf->port, addr, addrlen);
}
- if (recvd == 0) {
+ if (copy_len < pbuf->tot_len) {
+ sock->stack->stats.sock_rx_drop++;
+ }
+
+ if (sock->wakeup) {
+ sock->wakeup->stat.app_read_cnt++;
+ }
+ if (get_protocol_stack_group()->latency_start) {
+ calculate_lstack_latency(&sock->stack->latency, pbuf, GAZELLE_LATENCY_READ_LSTACK);
+ }
+
+ return copy_len;
+}
+
+ssize_t do_lwip_read_from_stack(int32_t fd, void *buf, size_t len, int32_t flags,
+ struct sockaddr *addr, socklen_t *addrlen)
+{
+ ssize_t recvd = 0;
+ struct lwip_sock *sock = get_socket_by_fd(fd);
+
+ if (recv_break_for_err(sock)) {
+ return -1;
+ }
+
+ thread_bind_stack(sock);
+
+ if (sock->same_node_rx_ring != NULL) {
+ return gazelle_same_node_ring_recv(sock, buf, len, flags);
+ }
+
+ if (NETCONN_IS_UDP(sock)) {
+ recvd = recv_ring_udp_read(sock, buf, len, netconn_is_nonblocking(sock->conn), addr, addrlen);
+ } else {
+ recvd = recv_ring_tcp_read(sock, buf, len, netconn_is_nonblocking(sock->conn));
+ }
+
+ /* rte_ring_count reduce lock */
+ if (sock->wakeup && sock->wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLIN)
+ && (!NETCONN_IS_DATAIN(sock))) {
+ del_sock_event(sock, EPOLLIN);
+ }
+
+ if (recvd < 0) {
if (sock->wakeup) {
sock->wakeup->stat.read_null++;
}
- GAZELLE_RETURN(EAGAIN);
+ return -1;
}
return recvd;
}
--
2.27.0
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。