代码拉取完成,页面将自动刷新
同步操作将从 src-openEuler/gazelle 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
From 40aba0460aade8bb3dd775f459a177487a549384 Mon Sep 17 00:00:00 2001
From: jiangheng <jiangheng14@huawei.com>
Date: Mon, 13 Mar 2023 16:49:25 +0800
Subject: [PATCH] fix send reset by peer when not sleep after connect fix
duplicate port alloc fix poll(NULL, 0, timeout) error fix multi nic, portid
not correct fix add connect failed when transfer pkt to other process fix
kni=0x0 coredump in kni_handle_rx when num_cpus > 1 fix accidental connect
error or slowly
---
src/common/gazelle_opt.h | 2 +
src/lstack/api/lstack_wrap.c | 90 +++++++++++++++++++------
src/lstack/core/lstack_dpdk.c | 6 +-
src/lstack/core/lstack_protocol_stack.c | 34 ++++++----
src/lstack/core/lstack_stack_stat.c | 3 +
src/lstack/include/lstack_ethdev.h | 6 ++
src/lstack/netif/lstack_ethdev.c | 28 ++++----
7 files changed, 124 insertions(+), 45 deletions(-)
diff --git a/src/common/gazelle_opt.h b/src/common/gazelle_opt.h
index e278107..4c0eef3 100644
--- a/src/common/gazelle_opt.h
+++ b/src/common/gazelle_opt.h
@@ -94,4 +94,6 @@
#define LSTACK_RECV_THREAD_NAME "lstack_recv"
#define LSTACK_THREAD_NAME "gazellelstack"
+#define SLEEP_US_BEFORE_LINK_UP 10000
+
#endif /* _GAZELLE_OPT_H_ */
diff --git a/src/lstack/api/lstack_wrap.c b/src/lstack/api/lstack_wrap.c
index 46cbcec..ecde391 100644
--- a/src/lstack/api/lstack_wrap.c
+++ b/src/lstack/api/lstack_wrap.c
@@ -12,6 +12,7 @@
#define _GNU_SOURCE
#include <dlfcn.h>
+#include <string.h>
#include <signal.h>
#include <sys/socket.h>
@@ -22,9 +23,11 @@
#include <netinet/tcp.h>
#include <sys/epoll.h>
#include <unistd.h>
+#include <net/if.h>
#include <lwip/posix_api.h>
#include <lwip/lwipsock.h>
+#include <lwip/tcp.h>
#include "posix/lstack_epoll.h"
#include "posix/lstack_unistd.h"
@@ -84,6 +87,11 @@ static enum KERNEL_LWIP_PATH select_path(int fd)
return PATH_KERNEL;
}
+ struct tcp_pcb *pcb = sock->conn->pcb.tcp;
+ if (pcb != NULL && pcb->state <= ESTABLISHED) {
+ return PATH_LWIP;
+ }
+
return PATH_UNKNOW;
}
@@ -179,6 +187,28 @@ static int32_t do_accept4(int32_t s, struct sockaddr *addr, socklen_t *addrlen,
return posix_api->accept4_fn(s, addr, addrlen, flags);
}
+#define SIOCGIFADDR 0x8915
+static int get_addr(struct sockaddr_in *sin, char *interface)
+{
+ int sockfd = 0;
+ struct ifreq ifr;
+
+ if ((sockfd = posix_api->socket_fn(AF_INET, SOCK_STREAM, 0)) < 0) return -1;
+
+ memset(&ifr, 0, sizeof(ifr));
+ snprintf(ifr.ifr_name, (sizeof(ifr.ifr_name) - 1), "%s", interface);
+
+ if(posix_api->ioctl_fn(sockfd, SIOCGIFADDR, &ifr) < 0){
+ posix_api->close_fn(sockfd);
+ return -1;
+ }
+ posix_api->close_fn(sockfd);
+
+ memcpy(sin, &ifr.ifr_addr, sizeof(struct sockaddr_in));
+
+ return 0;
+}
+
static int32_t do_bind(int32_t s, const struct sockaddr *name, socklen_t namelen)
{
if (name == NULL) {
@@ -202,12 +232,36 @@ static int32_t do_bind(int32_t s, const struct sockaddr *name, socklen_t namelen
bool is_dst_ip_localhost(const struct sockaddr *addr)
{
- struct cfg_params *global_params = get_global_cfg_params();
struct sockaddr_in *servaddr = (struct sockaddr_in *) addr;
- if(global_params->host_addr.addr == servaddr->sin_addr.s_addr){
- return true;
+ FILE *ifh = fopen("/proc/net/dev", "r");
+ char *line = NULL;
+ char *p;
+ size_t linel = 0;
+ int linenum = 0;
+ struct sockaddr_in* sin = malloc(sizeof(struct sockaddr_in));
+
+ while (getdelim(&line, &linel, '\n', ifh) > 0) {
+ if (linenum++ < 2) continue;
+
+ p = line;
+ while (isspace(*p))
+ ++p;
+ int n = strcspn(p, ": \t");
+
+ char interface[20] = {0};
+ strncpy(interface, p, n);
+
+ memset(sin, 0, sizeof(struct sockaddr_in));
+ int ret = get_addr(sin, interface);
+ if (ret == 0) {
+ if(sin->sin_addr.s_addr == servaddr->sin_addr.s_addr){
+ return 1;
+ }
+ }
}
- return false;
+ free(sin);
+
+ return 0;
}
static int32_t do_connect(int32_t s, const struct sockaddr *name, socklen_t namelen)
@@ -229,22 +283,17 @@ static int32_t do_connect(int32_t s, const struct sockaddr *name, socklen_t name
GAZELLE_RETURN(EINVAL);
}
- int32_t ret = rpc_call_connect(s, name, namelen);
- if (ret == 0 || errno == EISCONN) {
- return ret;
- }
-
+ int32_t ret = 0;
char listen_ring_name[RING_NAME_LEN];
int remote_port = htons(((struct sockaddr_in *)name)->sin_port);
snprintf(listen_ring_name, sizeof(listen_ring_name), "listen_rx_ring_%u", remote_port);
- if (!is_dst_ip_localhost(name) || rte_ring_lookup(listen_ring_name) == NULL) {
+ if (is_dst_ip_localhost(name) && rte_ring_lookup(listen_ring_name) == NULL) {
ret = posix_api->connect_fn(s, name, namelen);
- if (ret == 0) {
- return ret;
- }
+ } else {
+ ret = rpc_call_connect(s, name, namelen);
}
- return -1;
+ return ret;
}
static inline int32_t do_listen(int32_t s, int32_t backlog)
@@ -253,7 +302,12 @@ static inline int32_t do_listen(int32_t s, int32_t backlog)
return posix_api->listen_fn(s, backlog);
}
- int32_t ret = stack_broadcast_listen(s, backlog);
+ int32_t ret;
+ if (use_ltran() && get_global_cfg_params()->listen_shadow == 0) {
+ ret = stack_single_listen(s, backlog);
+ } else {
+ ret = stack_broadcast_listen(s, backlog);
+ }
if (ret != 0) {
return ret;
}
@@ -467,11 +521,7 @@ static inline int32_t do_close(int32_t s)
static int32_t do_poll(struct pollfd *fds, nfds_t nfds, int32_t timeout)
{
- if (fds == NULL) {
- GAZELLE_RETURN(EINVAL);
- }
-
- if (unlikely(posix_api->ues_posix) || nfds == 0 || !select_thread_path()) {
+ if (unlikely(posix_api->ues_posix) || fds == NULL || nfds == 0 || !select_thread_path()) {
return posix_api->poll_fn(fds, nfds, timeout);
}
diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c
index c6d6290..2463d3e 100644
--- a/src/lstack/core/lstack_dpdk.c
+++ b/src/lstack/core/lstack_dpdk.c
@@ -116,6 +116,7 @@ int32_t dpdk_eal_init(void)
ret = 0;
} else {
LSTACK_PRE_LOG(LSTACK_ERR, "rte_eal_init failed init, rte_errno %d\n", rte_errno);
+ return ret;
}
} else {
LSTACK_PRE_LOG(LSTACK_INFO, "dpdk_eal_init success\n");
@@ -450,6 +451,7 @@ int32_t dpdk_ethdev_init(void)
if (port_id < 0) {
return port_id;
}
+ get_global_cfg_params()->port_id = port_id;
struct rte_eth_dev_info dev_info;
int32_t ret = rte_eth_dev_info_get(port_id, &dev_info);
@@ -477,6 +479,9 @@ int32_t dpdk_ethdev_init(void)
stack_group->port_id = eth_params->port_id;
stack_group->rx_offload = eth_params->conf.rxmode.offloads;
stack_group->tx_offload = eth_params->conf.txmode.offloads;
+ /* used for tcp port alloc */
+ stack_group->reta_mask = dev_info.reta_size - 1;
+ stack_group->nb_queues = nb_queues;
if (get_global_cfg_params()->is_primary) {
for (uint32_t i = 0; i < stack_group->stack_num; i++) {
@@ -511,7 +516,6 @@ int32_t dpdk_ethdev_init(void)
rss_setup(port_id, nb_queues);
stack_group->reta_mask = dev_info.reta_size - 1;
}
- stack_group->nb_queues = nb_queues;
}
return 0;
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index 0d7b7f0..5811b26 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -132,20 +132,28 @@ struct protocol_stack *get_bind_protocol_stack(void)
int min_conn_num = GAZELLE_MAX_CLIENTS;
/* close listen shadow, per app communication thread select only one stack */
- for (uint16_t i = 0; i < stack_group->stack_num; i++) {
- struct protocol_stack* stack = stack_group->stacks[i];
- if (get_global_cfg_params()->seperate_send_recv) {
- if (stack->is_send_thread && stack->conn_num < min_conn_num) {
- index = i;
- min_conn_num = stack->conn_num;
- }
- }else {
- if (stack->conn_num < min_conn_num) {
- index = i;
- min_conn_num = stack->conn_num;
+ if (use_ltran() && get_global_cfg_params()->listen_shadow == 0) {
+ static _Atomic uint16_t stack_index = 0;
+ index = atomic_fetch_add(&stack_index, 1);
+ if (index >= stack_group->stack_num) {
+ LSTACK_LOG(ERR, LSTACK, "thread =%hu larger than stack num = %hu\n", index, stack_group->stack_num);
+ return NULL;
+ }
+ } else {
+ for (uint16_t i = 0; i < stack_group->stack_num; i++) {
+ struct protocol_stack* stack = stack_group->stacks[i];
+ if (get_global_cfg_params()->seperate_send_recv) {
+ if (stack->is_send_thread && stack->conn_num < min_conn_num) {
+ index = i;
+ min_conn_num = stack->conn_num;
+ }
+ }else {
+ if (stack->conn_num < min_conn_num) {
+ index = i;
+ min_conn_num = stack->conn_num;
+ }
}
}
-
}
bind_stack = stack_group->stacks[index];
@@ -426,6 +434,8 @@ static struct protocol_stack *stack_thread_init(void *arg)
wait_sem_value(&stack_group->ethdev_init, 1);
}
+ usleep(SLEEP_US_BEFORE_LINK_UP);
+
if (ethdev_init(stack) != 0) {
free(stack);
return NULL;
diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c
index 92d7a39..489b267 100644
--- a/src/lstack/core/lstack_stack_stat.c
+++ b/src/lstack/core/lstack_stack_stat.c
@@ -51,6 +51,9 @@ uint64_t get_current_time(void)
void calculate_lstack_latency(struct gazelle_stack_latency *stack_latency, const struct pbuf *pbuf,
enum GAZELLE_LATENCY_TYPE type)
{
+ if (pbuf == NULL) {
+ return;
+ }
const uint64_t *priv = (uint64_t *)((uint8_t *)(pbuf) + LATENCY_OFFSET);
if (*priv != ~(*(priv + 1)) || *priv < stack_latency->start_time) {
return;
diff --git a/src/lstack/include/lstack_ethdev.h b/src/lstack/include/lstack_ethdev.h
index 55cf769..25f5b8e 100644
--- a/src/lstack/include/lstack_ethdev.h
+++ b/src/lstack/include/lstack_ethdev.h
@@ -26,6 +26,12 @@ enum PACKET_TRANSFER_TYPE{
TRANSFER_CURRENT_THREAD,
};
+enum TRANSFER_MESSAGE_RESULT {
+ CONNECT_ERROR = -2,
+ REPLY_ERROR = -1,
+ TRANSFER_SUCESS = 0,
+};
+
struct protocol_stack;
struct rte_mbuf;
struct lstack_dev_ops {
diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c
index 01b1280..5032b5b 100644
--- a/src/lstack/netif/lstack_ethdev.c
+++ b/src/lstack/netif/lstack_ethdev.c
@@ -160,25 +160,23 @@ int transfer_pkt_to_other_process(char *buf, int process_index, int write_len, b
int sockfd;
int ret = 0;
- if ((sockfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0)) < 0) {
- return -1;
- }
+ sockfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0);
memset(&serun, 0, sizeof(serun));
serun.sun_family = AF_UNIX;
sprintf_s(serun.sun_path, PATH_MAX,"%s%d", server_path, process_index);
int len = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path);
if (posix_api->connect_fn(sockfd, (struct sockaddr *)&serun, len) < 0){
- return -1;
+ return CONNECT_ERROR;
}
posix_api->write_fn(sockfd, buf, write_len);
if (need_reply) {
char reply_message[REPLY_LEN];
posix_api->read_fn(sockfd, reply_message, REPLY_LEN);
if (strcmp(reply_message, SUCCESS_REPLY) == 0) {
- ret = 0;
+ ret = TRANSFER_SUCESS;
}else {
- ret = -1;
+ ret = REPLY_ERROR;
}
}
posix_api->close_fn(sockfd);
@@ -291,7 +289,7 @@ void transfer_delete_rule_info_to_process0(uint32_t dst_ip, uint16_t src_port, u
char process_server_path[DELETE_FLOWS_PARAMS_LENGTH];
sprintf_s(process_server_path, DELETE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u", dst_ip,split_delim, src_port,split_delim,dst_port);
int ret = transfer_pkt_to_other_process(process_server_path, 0, DELETE_FLOWS_PARAMS_LENGTH, false);
- if(ret != 0){
+ if(ret != TRANSFER_SUCESS){
LSTACK_LOG(ERR, LSTACK,"transfer_delete_rule_info_to_process0 error. tid %d. dst_ip %u, src_port: %u, dst_port %u\n",
rte_gettid(), dst_ip, src_port, dst_port);
}
@@ -310,7 +308,7 @@ void transfer_create_rule_info_to_process0(uint16_t queue_id, uint32_t src_ip, u
sprintf_s(process_server_path, CREATE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u%s%u%s%u%s%u",
dst_ip,split_delim,src_ip,split_delim, dst_port,split_delim,src_port, split_delim,queue_id,split_delim,process_idx);
int ret = transfer_pkt_to_other_process(process_server_path, 0, CREATE_FLOWS_PARAMS_LENGTH, true);
- if(ret != 0){
+ if(ret != TRANSFER_SUCESS){
LSTACK_LOG(ERR, LSTACK,"transfer_create_rule_info_to_process0 error. tid %d. src_ip %u, dst_ip %u, src_port: %u, dst_port %u, queue_id %u, process_idx %u\n",
rte_gettid(), src_ip, dst_ip, src_port, dst_port, queue_id, process_idx);
}
@@ -321,7 +319,7 @@ void transfer_add_or_delete_listen_port_to_process0(uint16_t listen_port, uint8_
char process_server_path[ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH];
sprintf_s(process_server_path, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH, "%u%s%u%s%u", listen_port,split_delim,process_idx, split_delim, is_add);
int ret = transfer_pkt_to_other_process(process_server_path, 0, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH, true);
- if(ret != 0){
+ if(ret != TRANSFER_SUCESS) {
LSTACK_LOG(ERR, LSTACK,"transfer_add_or_delete_listen_port_to_process0 error. tid %d. listen_port %u, process_idx %u\n",
rte_gettid(), listen_port, process_idx);
}
@@ -416,8 +414,10 @@ void transfer_arp_to_other_process(struct rte_mbuf *mbuf)
char arp_mbuf[LSTACK_MBUF_LEN] = {0};
sprintf_s(arp_mbuf, sizeof(arp_mbuf), "%lu", mbuf);
int result = transfer_pkt_to_other_process(arp_mbuf, i, LSTACK_MBUF_LEN, false);
- if(result < 0){
- LSTACK_LOG(ERR, LSTACK,"transfer arp pakages to process %d error. \n", i);
+ if(result == CONNECT_ERROR){
+ LSTACK_LOG(INFO, LSTACK,"connect process %d failed, ensure the process is started.\n", i);
+ }else if (result == REPLY_ERROR) {
+ LSTACK_LOG(ERR, LSTACK,"transfer arp pakages to process %d error. %m\n", i);
}
}
}
@@ -616,7 +616,11 @@ int distribute_pakages(struct rte_mbuf *mbuf)
void kni_handle_rx(uint16_t port_id)
{
struct rte_mbuf *pkts_burst[PACKET_READ_SIZE];
- uint32_t nb_kni_rx = rte_kni_rx_burst(get_gazelle_kni(), pkts_burst, PACKET_READ_SIZE);
+ struct rte_kni* kni = get_gazelle_kni();
+ uint32_t nb_kni_rx = 0;
+ if (kni) {
+ nb_kni_rx = rte_kni_rx_burst(kni, pkts_burst, PACKET_READ_SIZE);
+ }
if (nb_kni_rx > 0) {
uint16_t nb_rx = rte_eth_tx_burst(port_id, 0, pkts_burst, nb_kni_rx);
for (uint16_t i = nb_rx; i < nb_kni_rx; ++i) {
--
2.23.0
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。