代码拉取完成,页面将自动刷新
同步操作将从 src-openEuler/gazelle 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
From eedeee26cb113f550339d54238ca41eb012697e3 Mon Sep 17 00:00:00 2001
From: yinbin <yinbin8@huawei.com>
Date: Wed, 31 Jul 2024 20:31:57 +0800
Subject: [PATCH] refector fill udp sendring
---
src/lstack/core/lstack_lwip.c | 109 ++++++++++++++++++------
src/lstack/core/lstack_protocol_stack.c | 11 +--
2 files changed, 84 insertions(+), 36 deletions(-)
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index 271e94f..19ff22d 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -457,22 +457,67 @@ int sem_timedwait_nsecs(sem_t *sem)
return sem_timedwait(sem, &ts);
}
-static ssize_t do_lwip_fill_sendring(struct lwip_sock *sock, const void *buf, size_t len,
- const struct sockaddr *addr, socklen_t addrlen)
+static ssize_t do_lwip_udp_fill_sendring(struct lwip_sock *sock, const void *buf, size_t len,
+ const struct sockaddr *addr, socklen_t addrlen)
{
- if (sock->errevent > 0) {
- GAZELLE_RETURN(ENOTCONN);
+ if (len > GAZELLE_UDP_PKGLEN_MAX) {
+ LSTACK_LOG(ERR, LSTACK, "Message too long\n");
+ GAZELLE_RETURN(EMSGSIZE);
+ }
+
+ ssize_t send_len = 0;
+ uint32_t write_num = (len + MBUF_MAX_DATA_LEN - 1) / MBUF_MAX_DATA_LEN;
+ uint32_t write_avail = gazelle_ring_readable_count(sock->send_ring);
+ struct wakeup_poll *wakeup = sock->wakeup;
+
+ if (write_num > rte_ring_get_capacity(sock->send_ring)) {
+ LSTACK_LOG(ERR, LSTACK, "sock send_ring size is not enough\n");
+ GAZELLE_RETURN(ENOMEM);
+ }
+
+ /* if udp send 0 packet, set write_num to at least 1 */
+ if (write_num == 0) {
+ write_num = 1;
+ }
+
+ while (!netconn_is_nonblocking(sock->conn) && (write_avail < write_num)) {
+ if (sock->errevent > 0) {
+ GAZELLE_RETURN(ENOTCONN);
+ }
+ write_avail = gazelle_ring_readable_count(sock->send_ring);
+ }
+
+ if (write_avail < write_num) {
+ sem_timedwait_nsecs(&sock->snd_ring_sem);
+ GAZELLE_RETURN(ENOMEM);
+ }
+
+ send_len = app_buff_write(sock, (char *)buf, len, write_num, addr, addrlen);
+
+ if (wakeup && wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLOUT)
+ && !NETCONN_IS_OUTIDLE(sock)) {
+ del_sock_event(sock, EPOLLOUT);
+ }
+
+ if (wakeup) {
+ wakeup->stat.app_write_cnt += write_num;
}
- struct protocol_stack *stack = sock->stack;
- if (!stack) {
+ return send_len;
+}
+
+static ssize_t do_lwip_tcp_fill_sendring(struct lwip_sock *sock, const void *buf, size_t len,
+ const struct sockaddr *addr, socklen_t addrlen)
+{
+ /* refer to the lwip implementation. */
+ if (len == 0) {
return 0;
}
ssize_t send_len = 0;
/* merge data into last pbuf */
- if (!NETCONN_IS_UDP(sock) && sock->remain_len) {
+ if (sock->remain_len) {
sock->stack->stats.sock_tx_merge++;
send_len = merge_data_lastpbuf(sock, (char *)buf, len);
if (send_len >= len) {
@@ -485,11 +530,6 @@ static ssize_t do_lwip_fill_sendring(struct lwip_sock *sock, const void *buf, si
uint32_t write_avail = gazelle_ring_readable_count(sock->send_ring);
struct wakeup_poll *wakeup = sock->wakeup;
- /* if udp send 0 packet, set write_num to at least 1 */
- if (NETCONN_IS_UDP(sock) && write_num == 0) {
- write_num = 1;
- }
-
while (!netconn_is_nonblocking(sock->conn) && (write_avail < write_num)) {
if (sock->errevent > 0) {
GAZELLE_RETURN(ENOTCONN);
@@ -500,9 +540,6 @@ static ssize_t do_lwip_fill_sendring(struct lwip_sock *sock, const void *buf, si
/* send_ring is full, data attach last pbuf */
if (write_avail == 0) {
sem_timedwait_nsecs(&sock->snd_ring_sem);
- if (likely(sock->send_ring != NULL)) {
- write_avail = gazelle_ring_readable_count(sock->send_ring);
- }
goto END;
}
@@ -523,10 +560,11 @@ static ssize_t do_lwip_fill_sendring(struct lwip_sock *sock, const void *buf, si
}
END:
- if (send_len == 0 && !NETCONN_IS_UDP(sock)) {
+ if (send_len == 0) {
errno = EAGAIN;
return -1;
}
+
return send_len;
}
@@ -817,28 +855,39 @@ static inline void thread_bind_stack(struct lwip_sock *sock)
ssize_t do_lwip_send_to_stack(int32_t fd, const void *buf, size_t len, int32_t flags,
const struct sockaddr *addr, socklen_t addrlen)
{
+ ssize_t send = 0;
+
if (buf == NULL) {
GAZELLE_RETURN(EINVAL);
}
- struct lwip_sock *sock = lwip_get_socket(fd);
- if (len == 0 && !NETCONN_IS_UDP(sock)) {
- return 0;
- }
-
- if (NETCONN_IS_UDP(sock) && (len > GAZELLE_UDP_PKGLEN_MAX)) {
- LSTACK_LOG(ERR, LSTACK, "Message too long\n");
- GAZELLE_RETURN(EMSGSIZE);
+ if (addr && addr->sa_family != AF_INET && addr->sa_family != AF_INET6) {
+ GAZELLE_RETURN(EINVAL);
}
+
+ struct lwip_sock *sock = lwip_get_socket(fd);
thread_bind_stack(sock);
if (sock->same_node_tx_ring != NULL) {
return gazelle_same_node_ring_send(sock, buf, len, flags);
}
- ssize_t send = do_lwip_fill_sendring(sock, buf, len, addr, addrlen);
- if (send < 0 || (send == 0 && !NETCONN_IS_UDP(sock))) {
- return send;
+ if (sock->errevent > 0 || sock->stack == NULL) {
+ GAZELLE_RETURN(ENOTCONN);
+ }
+
+ if (NETCONN_IS_UDP(sock)) {
+ send = do_lwip_udp_fill_sendring(sock, buf, len, addr, addrlen);
+ /* send = 0: udp send a empty package */
+ if (send < 0) {
+ return send;
+ }
+ } else {
+ send = do_lwip_tcp_fill_sendring(sock, buf, len, addr, addrlen);
+ // send = 0 : tcp peer close connection ?
+ if (send <= 0) {
+ return send;
+ }
}
notice_stack_send(sock, fd, send, flags);
@@ -860,7 +909,11 @@ ssize_t do_lwip_sendmsg_to_stack(struct lwip_sock *sock, int32_t s, const struct
continue;
}
- ret = do_lwip_fill_sendring(sock, message->msg_iov[i].iov_base, message->msg_iov[i].iov_len, NULL, 0);
+ if (NETCONN_IS_UDP(sock)) {
+ ret = do_lwip_udp_fill_sendring(sock, message->msg_iov[i].iov_base, message->msg_iov[i].iov_len, NULL, 0);
+ } else {
+ ret = do_lwip_tcp_fill_sendring(sock, message->msg_iov[i].iov_base, message->msg_iov[i].iov_len, NULL, 0);
+ }
if (ret <= 0) {
buflen = (buflen == 0) ? ret : buflen;
break;
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index 2867711..f433d0d 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -916,7 +916,6 @@ void stack_udp_send(struct rpc_msg *msg)
size_t len = msg->args[MSG_ARG_1].size;
struct protocol_stack *stack = get_protocol_stack();
int replenish_again;
- uint32_t call_num;
struct lwip_sock *sock = lwip_get_socket(fd);
if (sock == NULL) {
@@ -930,16 +929,12 @@ void stack_udp_send(struct rpc_msg *msg)
}
replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0);
- call_num = __sync_fetch_and_sub(&sock->call_num, 1);
- if (replenish_again < 0) {
- return;
- }
-
- if ((call_num == 1) && (replenish_again > 0)) {
+ if ((replenish_again > 0) && (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1)) {
rpc_call_replenish(&stack->rpc_queue, sock);
return;
}
-
+
+ __sync_fetch_and_sub(&sock->call_num, 1);
return;
}
--
2.33.0
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。