代码拉取完成,页面将自动刷新
同步操作将从 src-openEuler/gazelle 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
From c109336bcf860c9dd16ba8becd9de72ecdce4d8f Mon Sep 17 00:00:00 2001
From: jiangheng <jiangheng14@huawei.com>
Date: Tue, 11 Jun 2024 14:15:49 +0800
Subject: [PATCH] refactor udp send distinguish tcp/udp get_from_sendring
cancel the restrictioin that maximum of 2 rpc msg can be send over the same
udp sock
---
src/lstack/core/lstack_lwip.c | 107 ++++++++++++++++--------
src/lstack/core/lstack_protocol_stack.c | 39 ++++++++-
src/lstack/core/lstack_thread_rpc.c | 29 ++++++-
src/lstack/include/lstack_lwip.h | 3 +-
src/lstack/include/lstack_rpc_proc.h | 3 +-
src/lstack/include/lstack_thread_rpc.h | 3 +-
6 files changed, 141 insertions(+), 43 deletions(-)
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index 153c5cc..db948b0 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -101,7 +101,7 @@ static struct pbuf *init_mbuf_to_pbuf(struct rte_mbuf *mbuf, pbuf_layer layer, u
void *data = rte_pktmbuf_mtod(mbuf, void *);
struct pbuf *pbuf = pbuf_alloced_custom(layer, length, type, pbuf_custom, data, MAX_PACKET_SZ);
if (pbuf) {
- pbuf->allow_in = 1;
+ pbuf->allow_append = 1;
pbuf->addr = *IP_ANY_TYPE;
pbuf->port = 0;
pthread_spin_init(&pbuf->pbuf_lock, PTHREAD_PROCESS_SHARED);
@@ -227,24 +227,61 @@ struct pbuf *do_lwip_alloc_pbuf(pbuf_layer layer, uint16_t length, pbuf_type typ
return init_mbuf_to_pbuf(mbuf, layer, length, type);
}
-struct pbuf *do_lwip_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size, uint8_t *apiflags)
+static inline bool pbuf_allow_append(struct pbuf *pbuf, uint16_t remain_size)
+{
+ pthread_spin_lock(&pbuf->pbuf_lock);
+ if (pbuf->tot_len > remain_size) {
+ pthread_spin_unlock(&pbuf->pbuf_lock);
+ return false;
+ }
+ if (pbuf->allow_append == 1) {
+ __sync_fetch_and_sub(&pbuf->allow_append, 1);
+ }
+
+ pthread_spin_unlock(&pbuf->pbuf_lock);
+ return true;
+}
+
+struct pbuf *do_lwip_udp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size)
+{
+ int count;
+ /* when remain_size is 0, fill_sendring write one pbuf to sendring */
+ if (remain_size == 0) {
+ count = 1;
+ } else {
+ count = (remain_size + MBUF_MAX_DATA_LEN - 1) / MBUF_MAX_DATA_LEN;
+ }
+
+ struct pbuf *pbufs[count];
+
+ int actual_count = gazelle_ring_sc_dequeue(sock->send_ring, (void **)&pbufs, count);
+ if (unlikely(actual_count != count)) {
+ LSTACK_LOG(ERR, LSTACK, "udp get pbuf from sendring error, expected: %d, actual: %d\n",
+ count, actual_count);
+ }
+
+ if (unlikely(pbufs[0]->tot_len != remain_size)) {
+ LSTACK_LOG(ERR, LSTACK, "udp get pbuf size error, expected: %d, actual: %d\n",
+ remain_size, pbufs[0]->tot_len);
+ }
+
+ for (int i = 0; get_protocol_stack_group()->latency_start && i < count; i++) {
+ calculate_lstack_latency(&sock->stack->latency, pbufs[i], GAZELLE_LATENCY_WRITE_LWIP, 0);
+ }
+
+ return pbufs[0];
+}
+
+struct pbuf *do_lwip_tcp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size)
{
struct pbuf *pbuf = NULL;
if (unlikely(sock->send_pre_del)) {
- pbuf = sock->send_pre_del;
- pthread_spin_lock(&pbuf->pbuf_lock);
- if (pbuf->tot_len > remain_size) {
- pthread_spin_unlock(&pbuf->pbuf_lock);
- *apiflags &= ~TCP_WRITE_FLAG_MORE;
+ if (pbuf_allow_append(sock->send_pre_del, remain_size)) {
+ return sock->send_pre_del;
+ } else {
return NULL;
}
- if (pbuf->allow_in == 1) {
- __sync_fetch_and_sub(&pbuf->allow_in, 1);
- }
- pthread_spin_unlock(&pbuf->pbuf_lock);
-
- return pbuf;
}
gazelle_ring_sc_dequeue(sock->send_ring, (void **)&pbuf, 1);
@@ -252,17 +289,6 @@ struct pbuf *do_lwip_get_from_sendring(struct lwip_sock *sock, uint16_t remain_s
return NULL;
}
- /* udp send a pbuf chain, dequeue all pbufs except head pbuf */
- if (NETCONN_IS_UDP(sock) && remain_size > MBUF_MAX_DATA_LEN) {
- int size = (remain_size + MBUF_MAX_DATA_LEN - 1) / MBUF_MAX_DATA_LEN - 1;
- struct pbuf *pbuf_used[size];
- gazelle_ring_sc_dequeue(sock->send_ring, (void **)&pbuf_used, size);
-
- for (uint32_t i = 0; get_protocol_stack_group()->latency_start && i < size; i++) {
- calculate_lstack_latency(&sock->stack->latency, pbuf_used[i], GAZELLE_LATENCY_WRITE_LWIP, 0);
- }
- }
-
if (get_protocol_stack_group()->latency_start) {
calculate_lstack_latency(&sock->stack->latency, pbuf, GAZELLE_LATENCY_WRITE_LWIP, 0);
}
@@ -270,19 +296,11 @@ struct pbuf *do_lwip_get_from_sendring(struct lwip_sock *sock, uint16_t remain_s
sock->send_pre_del = pbuf;
if (!gazelle_ring_readover_count(sock->send_ring)) {
- pthread_spin_lock(&pbuf->pbuf_lock);
- if (pbuf->tot_len > remain_size) {
- pthread_spin_unlock(&pbuf->pbuf_lock);
- *apiflags &= ~TCP_WRITE_FLAG_MORE;
+ if (!pbuf_allow_append(pbuf, remain_size)) {
return NULL;
}
- if (pbuf->allow_in == 1) {
- __sync_fetch_and_sub(&pbuf->allow_in, 1);
- }
- pthread_spin_unlock(&pbuf->pbuf_lock);
} else {
if (pbuf->tot_len > remain_size) {
- *apiflags &= ~TCP_WRITE_FLAG_MORE;
return NULL;
}
}
@@ -388,7 +406,7 @@ static inline struct pbuf *gazelle_ring_readlast(struct rte_ring *r)
if (pthread_spin_trylock(&last_pbuf->pbuf_lock) != 0) {
return NULL;
}
- if (last_pbuf->allow_in != 1) {
+ if (last_pbuf->allow_append != 1) {
pthread_spin_unlock(&last_pbuf->pbuf_lock);
return NULL;
}
@@ -675,17 +693,34 @@ ssize_t do_lwip_recvmsg_from_stack(int32_t s, const struct msghdr *message, int3
return buflen;
}
-static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags)
+static inline void notice_stack_tcp_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags)
{
// 2: call_num >= 2, don't need add new rpc send
if (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) < 2) {
- while (rpc_call_send(&sock->stack->rpc_queue, fd, NULL, len, flags) < 0) {
+ while (rpc_call_tcp_send(&sock->stack->rpc_queue, fd, len, flags) < 0) {
usleep(1000); // 1000: wait 1ms to exec again
}
__sync_fetch_and_add(&sock->call_num, 1);
}
}
+static inline void notice_stack_udp_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags)
+{
+ __sync_fetch_and_add(&sock->call_num, 1);
+ while (rpc_call_udp_send(&sock->stack->rpc_queue, fd, len, flags) < 0) {
+ usleep(1000); // 1000: wait 1ms to exec again
+ }
+}
+
+static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags)
+{
+ if (NETCONN_IS_UDP(sock)) {
+ notice_stack_udp_send(sock, fd, len, flags);
+ } else {
+ notice_stack_tcp_send(sock, fd, len, flags);
+ }
+}
+
/* process on same node use ring to recv data */
ssize_t gazelle_same_node_ring_recv(struct lwip_sock *sock, const void *buf, size_t len, int32_t flags)
{
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index f6d381e..d130c91 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -877,7 +877,7 @@ void stack_recv(struct rpc_msg *msg)
msg->args[MSG_ARG_3].i);
}
-void stack_send(struct rpc_msg *msg)
+void stack_tcp_send(struct rpc_msg *msg)
{
int32_t fd = msg->args[MSG_ARG_0].i;
size_t len = msg->args[MSG_ARG_1].size;
@@ -913,6 +913,39 @@ void stack_send(struct rpc_msg *msg)
return;
}
+void stack_udp_send(struct rpc_msg *msg)
+{
+ int32_t fd = msg->args[MSG_ARG_0].i;
+ size_t len = msg->args[MSG_ARG_1].size;
+ struct protocol_stack *stack = get_protocol_stack();
+ int replenish_again;
+ uint32_t call_num;
+
+ if (get_protocol_stack_group()->latency_start) {
+ calculate_rpcmsg_latency(&stack->latency, msg, GAZELLE_LATENCY_WRITE_RPC_MSG);
+ }
+
+ struct lwip_sock *sock = get_socket(fd);
+ if (sock == NULL) {
+ msg->result = -1;
+ LSTACK_LOG(ERR, LSTACK, "get sock error! fd=%d, len=%ld\n", fd, len);
+ return;
+ }
+
+ replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0);
+ call_num = __sync_fetch_and_sub(&sock->call_num, 1);
+ if (replenish_again < 0) {
+ return;
+ }
+
+ if ((call_num == 1) && (replenish_again > 0)) {
+ rpc_call_replenish(&stack->rpc_queue, sock);
+ return;
+ }
+
+ return;
+}
+
/* any protocol stack thread receives arp packet and sync it to other threads so that it can have the arp table */
void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack)
{
@@ -1040,6 +1073,10 @@ void stack_replenish_sendring(struct rpc_msg *msg)
struct lwip_sock *sock = (struct lwip_sock *)msg->args[MSG_ARG_0].p;
msg->result = do_lwip_replenish_sendring(stack, sock);
+ if (msg->result == true) {
+ msg->recall_flag = 1;
+ rpc_call(&stack->rpc_queue, msg);
+ }
}
void stack_get_conntable(struct rpc_msg *msg)
diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c
index 04bdc3a..e438c37 100644
--- a/src/lstack/core/lstack_thread_rpc.c
+++ b/src/lstack/core/lstack_thread_rpc.c
@@ -460,13 +460,36 @@ int32_t rpc_call_replenish(rpc_queue *queue, void *sock)
}
msg->args[MSG_ARG_0].p = sock;
+ msg->sync_flag = 0;
- return rpc_sync_call(queue, msg);
+ rpc_call(queue, msg);
+ return 0;
+}
+
+int32_t rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(stack_tcp_send);
+ if (msg == NULL) {
+ return -1;
+ }
+
+ if (get_protocol_stack_group()->latency_start) {
+ time_stamp_into_rpcmsg(get_socket_by_fd(fd));
+ }
+
+ msg->args[MSG_ARG_0].i = fd;
+ msg->args[MSG_ARG_1].size = len;
+ msg->args[MSG_ARG_2].i = flags;
+ msg->sync_flag = 0;
+
+ rpc_call(queue, msg);
+
+ return 0;
}
-int32_t rpc_call_send(rpc_queue *queue, int fd, const void *buf, size_t len, int flags)
+int32_t rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_send);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_udp_send);
if (msg == NULL) {
return -1;
}
diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h
index fa10e3f..85c9c20 100644
--- a/src/lstack/include/lstack_lwip.h
+++ b/src/lstack/include/lstack_lwip.h
@@ -33,7 +33,8 @@ int do_lwip_close(int32_t fd);
void do_lwip_init_sock(int32_t fd);
void do_lwip_clone_sockopt(struct lwip_sock *dst_sock, struct lwip_sock *src_sock);
-struct pbuf *do_lwip_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size, uint8_t *apiflags);
+struct pbuf *do_lwip_tcp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size);
+struct pbuf *do_lwip_udp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size);
void do_lwip_get_from_sendring_over(struct lwip_sock *sock);
bool do_lwip_replenish_sendring(struct protocol_stack *stack, struct lwip_sock *sock);
ssize_t do_lwip_read_from_lwip(struct lwip_sock *sock, int32_t flags, uint8_t apiflags);
diff --git a/src/lstack/include/lstack_rpc_proc.h b/src/lstack/include/lstack_rpc_proc.h
index 71f0c58..77b18bd 100644
--- a/src/lstack/include/lstack_rpc_proc.h
+++ b/src/lstack/include/lstack_rpc_proc.h
@@ -30,7 +30,8 @@ void stack_getsockopt(struct rpc_msg *msg);
void stack_setsockopt(struct rpc_msg *msg);
void stack_fcntl(struct rpc_msg *msg);
void stack_ioctl(struct rpc_msg *msg);
-void stack_send(struct rpc_msg *msg);
+void stack_tcp_send(struct rpc_msg *msg);
+void stack_udp_send(struct rpc_msg *msg);
void stack_mempool_size(struct rpc_msg *msg);
void stack_rpcpool_size(struct rpc_msg *msg);
void stack_create_shadow_fd(struct rpc_msg *msg);
diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h
index 276ebb2..fa98b0c 100644
--- a/src/lstack/include/lstack_thread_rpc.h
+++ b/src/lstack/include/lstack_thread_rpc.h
@@ -83,7 +83,8 @@ int32_t rpc_call_bind(rpc_queue *queue, int32_t fd, const struct sockaddr *addr,
int32_t rpc_call_listen(rpc_queue *queue, int s, int backlog);
int32_t rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags);
int32_t rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen);
-int32_t rpc_call_send(rpc_queue *queue, int fd, const void *buf, size_t len, int flags);
+int32_t rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags);
+int32_t rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags);
int32_t rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen);
int32_t rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen);
int32_t rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, void *optval, socklen_t *optlen);
--
2.33.0
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。