代码拉取完成,页面将自动刷新
同步操作将从 compile_success/gazelle_kylin_src 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
From 388b2202a0b248026e77ef2c340144ed547c87b7 Mon Sep 17 00:00:00 2001
From: jiangheng12 <jiangheng14@huawei.com>
Date: Wed, 14 Jun 2023 11:58:01 +0800
Subject: [PATCH] fix udp send/recv in muliple queue
---
src/lstack/api/lstack_wrap.c | 37 ++++++++++++++++++--
src/lstack/core/lstack_dpdk.c | 2 +-
src/lstack/core/lstack_lwip.c | 36 +++++++++++++++-----
src/lstack/core/lstack_protocol_stack.c | 39 ++++++++++++++++++++++
src/lstack/include/lstack_protocol_stack.h | 4 +++
5 files changed, 106 insertions(+), 12 deletions(-)
diff --git a/src/lstack/api/lstack_wrap.c b/src/lstack/api/lstack_wrap.c
index 7245873..1f33e13 100644
--- a/src/lstack/api/lstack_wrap.c
+++ b/src/lstack/api/lstack_wrap.c
@@ -236,7 +236,11 @@ static int32_t do_bind(int32_t s, const struct sockaddr *name, socklen_t namelen
}
}
- return rpc_call_bind(s, name, namelen);
+ if (NETCONN_IS_UDP(sock) && get_global_cfg_params()->listen_shadow) {
+ return stack_broadcast_bind(s, name, namelen);
+ } else {
+ return stack_single_bind(s, name, namelen);
+ }
}
bool is_dst_ip_localhost(const struct sockaddr *addr)
@@ -548,6 +552,31 @@ static inline ssize_t do_sendmsg(int32_t s, const struct msghdr *message, int32_
return posix_api->send_msg(s, message, flags);
}
+static inline ssize_t udp_recvfrom(struct lwip_sock *sock, int32_t sockfd, void *buf, size_t len, int32_t flags,
+ struct sockaddr *addr, socklen_t *addrlen)
+{
+ int32_t ret;
+
+ do {
+ ret = read_stack_data(sockfd, buf, len, flags, addr, addrlen);
+ if (ret > 0) {
+ return ret;
+ }
+ if (ret <= 0 && errno != EAGAIN) {
+ return -1;
+ }
+ sock = sock->listen_next;
+ sockfd = sock->conn->socket;
+ } while (sock != NULL);
+ GAZELLE_RETURN(EAGAIN);
+}
+
+static inline ssize_t tcp_recvfrom(struct lwip_sock *sock, int32_t sockfd, void *buf, size_t len, int32_t flags,
+ struct sockaddr *addr, socklen_t *addrlen)
+{
+ return read_stack_data(sockfd, buf, len, flags, addr, addrlen);
+}
+
static inline ssize_t do_recvfrom(int32_t sockfd, void *buf, size_t len, int32_t flags,
struct sockaddr *addr, socklen_t *addrlen)
{
@@ -561,7 +590,11 @@ static inline ssize_t do_recvfrom(int32_t sockfd, void *buf, size_t len, int32_t
struct lwip_sock *sock = NULL;
if (select_path(sockfd, &sock) == PATH_LWIP) {
- return read_stack_data(sockfd, buf, len, flags, addr, addrlen);
+ if (NETCONN_IS_UDP(sock)) {
+ return udp_recvfrom(sock, sockfd, buf, len, flags, addr, addrlen);
+ } else {
+ return tcp_recvfrom(sock, sockfd, buf, len, flags, addr, addrlen);
+ }
}
return posix_api->recv_from(sockfd, buf, len, flags, addr, addrlen);
diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c
index b321c18..169025c 100644
--- a/src/lstack/core/lstack_dpdk.c
+++ b/src/lstack/core/lstack_dpdk.c
@@ -362,7 +362,7 @@ uint64_t get_eth_params_tx_ol(void)
static int eth_params_rss(struct rte_eth_conf *conf, struct rte_eth_dev_info *dev_info)
{
int rss_enable = 0;
- uint64_t def_rss_hf = ETH_RSS_TCP | ETH_RSS_IP;
+ uint64_t def_rss_hf = ETH_RSS_TCP | ETH_RSS_UDP | ETH_RSS_IP;
struct rte_eth_rss_conf rss_conf = {
g_default_rss_key,
RSS_HASH_KEY_LEN,
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index 2e7a67a..34b4aa7 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -1230,13 +1230,19 @@ static inline void clone_lwip_socket_opt(struct lwip_sock *dst_sock, struct lwip
dst_sock->conn->pcb.ip->so_options = src_sock->conn->pcb.ip->so_options;
dst_sock->conn->pcb.ip->ttl = src_sock->conn->pcb.ip->ttl;
dst_sock->conn->pcb.ip->tos = src_sock->conn->pcb.ip->tos;
- dst_sock->conn->pcb.tcp->netif_idx = src_sock->conn->pcb.tcp->netif_idx;
- dst_sock->conn->pcb.tcp->flags = src_sock->conn->pcb.tcp->flags;
- dst_sock->conn->pcb.tcp->keep_idle = src_sock->conn->pcb.tcp->keep_idle;
- dst_sock->conn->pcb.tcp->keep_idle = src_sock->conn->pcb.tcp->keep_idle;
- dst_sock->conn->pcb.tcp->keep_intvl = src_sock->conn->pcb.tcp->keep_intvl;
- dst_sock->conn->pcb.tcp->keep_cnt = src_sock->conn->pcb.tcp->keep_cnt;
dst_sock->conn->flags = src_sock->conn->flags;
+ if (NETCONN_IS_UDP(src_sock)) {
+ dst_sock->conn->pcb.udp->flags = src_sock->conn->pcb.udp->flags;
+ dst_sock->conn->pcb.udp->mcast_ifindex = src_sock->conn->pcb.udp->mcast_ifindex;
+ dst_sock->conn->pcb.udp->mcast_ttl = src_sock->conn->pcb.udp->mcast_ttl;
+ } else {
+ dst_sock->conn->pcb.tcp->netif_idx = src_sock->conn->pcb.tcp->netif_idx;
+ dst_sock->conn->pcb.tcp->flags = src_sock->conn->pcb.tcp->flags;
+ dst_sock->conn->pcb.tcp->keep_idle = src_sock->conn->pcb.tcp->keep_idle;
+ dst_sock->conn->pcb.tcp->keep_idle = src_sock->conn->pcb.tcp->keep_idle;
+ dst_sock->conn->pcb.tcp->keep_intvl = src_sock->conn->pcb.tcp->keep_intvl;
+ dst_sock->conn->pcb.tcp->keep_cnt = src_sock->conn->pcb.tcp->keep_cnt;
+ }
}
int32_t gazelle_socket(int domain, int type, int protocol)
@@ -1265,16 +1271,28 @@ void create_shadow_fd(struct rpc_msg *msg)
struct sockaddr *addr = msg->args[MSG_ARG_1].p;
socklen_t addr_len = msg->args[MSG_ARG_2].socklen;
- int32_t clone_fd = gazelle_socket(AF_INET, SOCK_STREAM, 0);
+ int32_t clone_fd = 0;
+ struct lwip_sock *sock = get_socket_by_fd(fd);
+ if (sock == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "get sock null fd=%d\n", fd);
+ msg->result = -1;
+ return;
+ }
+
+ if (NETCONN_IS_UDP(sock)) {
+ clone_fd = gazelle_socket(AF_INET, SOCK_DGRAM, 0);
+ } else {
+ clone_fd = gazelle_socket(AF_INET, SOCK_STREAM, 0);
+ }
+
if (clone_fd < 0) {
LSTACK_LOG(ERR, LSTACK, "clone socket failed clone_fd=%d errno=%d\n", clone_fd, errno);
msg->result = clone_fd;
return;
}
- struct lwip_sock *sock = get_socket_by_fd(fd);
struct lwip_sock *clone_sock = get_socket_by_fd(clone_fd);
- if (sock == NULL || clone_sock == NULL) {
+ if (clone_sock == NULL) {
LSTACK_LOG(ERR, LSTACK, "get sock null fd=%d clone_fd=%d\n", fd, clone_fd);
msg->result = -1;
return;
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index 6c96555..52a0c8f 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -935,6 +935,45 @@ static void inline del_accept_in_event(struct lwip_sock *sock)
pthread_spin_unlock(&sock->wakeup->event_list_lock);
}
+/* choice one stack bind */
+int32_t stack_single_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen)
+{
+ return rpc_call_bind(fd, name, namelen);
+}
+
+/* bind sync to all protocol stack thread, so that any protocol stack thread can build connect */
+int32_t stack_broadcast_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen)
+{
+ struct protocol_stack *cur_stack = get_protocol_stack_by_fd(fd);
+ struct protocol_stack *stack = NULL;
+ int32_t ret, clone_fd;
+
+ struct lwip_sock *sock = get_socket(fd);
+ if (sock == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null\n", get_stack_tid(), fd);
+ GAZELLE_RETURN(EINVAL);
+ }
+
+ ret = rpc_call_bind(fd, name, namelen);
+ if (ret < 0) {
+ close(fd);
+ return ret;
+ }
+
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ for (int32_t i = 0; i < stack_group->stack_num; ++i) {
+ stack = stack_group->stacks[i];
+ if (stack != cur_stack) {
+ clone_fd = rpc_call_shadow_fd(stack, fd, name, namelen);
+ if (clone_fd < 0) {
+ stack_broadcast_close(fd);
+ return clone_fd;
+ }
+ }
+ }
+ return 0;
+}
+
/* ergodic the protocol stack thread to find the connection, because all threads are listening */
int32_t stack_broadcast_accept4(int32_t fd, struct sockaddr *addr, socklen_t *addrlen, int flags)
{
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index 3a447dc..a23ddff 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -128,6 +128,10 @@ int32_t stack_broadcast_close(int32_t fd);
int32_t stack_broadcast_listen(int32_t fd, int backlog);
int32_t stack_single_listen(int32_t fd, int32_t backlog);
+/* bind sync to all protocol stack thread, only for udp protocol */
+int32_t stack_broadcast_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen);
+int32_t stack_single_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen);
+
/* ergodic the protocol stack thread to find the connection, because all threads are listening */
int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *addrlen);
int32_t stack_broadcast_accept4(int32_t fd, struct sockaddr *addr, socklen_t *addrlen, int32_t flags);
--
2.23.0
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。