代码拉取完成,页面将自动刷新
同步操作将从 src-openEuler/lwip 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
From 8a68ee510f5da20edf7fa06da701713ef10db930 Mon Sep 17 00:00:00 2001
From: jiangheng12 <jiangheng14@huawei.com>
Date: Thu, 16 Mar 2023 19:59:26 +0800
Subject: [PATCH] same node & gazellectl -a
---
src/api/sockets.c | 21 +++++++++++++++++++++
src/core/ipv4/ip4_frag.c | 4 ++++
src/core/netif.c | 7 ++++---
src/core/pbuf.c | 6 ++++++
src/core/tcp.c | 39 +++++++++++++++++++++++++++++++++++++++
src/core/tcp_in.c | 6 ++++++
src/core/tcp_out.c | 11 +++++++++++
src/include/lwip/pbuf.h | 3 +++
src/include/lwip/tcp.h | 10 ++++++++++
src/include/lwipopts.h | 7 +++++++
src/include/lwipsock.h | 37 +++++++++++++++++++++++++++++++++++++
11 files changed, 148 insertions(+), 3 deletions(-)
diff --git a/src/api/sockets.c b/src/api/sockets.c
index 8ef89ab..7b5606f 100644
--- a/src/api/sockets.c
+++ b/src/api/sockets.c
@@ -645,6 +645,10 @@ alloc_socket(struct netconn *newconn, int accepted, int flags)
* (unless it has been created by accept()). */
sockets[i].sendevent = (NETCONNTYPE_GROUP(newconn->type) == NETCONN_TCP ? (accepted != 0) : 1);
sockets[i].errevent = 0;
+ sockets[i].same_node_rx_ring = NULL;
+ sockets[i].same_node_rx_ring_mz = NULL;
+ sockets[i].same_node_tx_ring = NULL;
+ sockets[i].same_node_tx_ring_mz = NULL;
return i + LWIP_SOCKET_OFFSET;
} else {
lwip_close(i);
@@ -756,6 +760,11 @@ free_socket(struct lwip_sock *sock, int is_tcp)
/* Protect socket array */
SYS_ARCH_PROTECT(lev);
+#if GAZELLE_ENABLE
+ /* remove sock from same_node_recv_lit */
+ list_del_node_null(&sock->recv_list);
+#endif
+
freed = free_socket_locked(sock, is_tcp, &conn, &lastdata);
SYS_ARCH_UNPROTECT(lev);
/* don't use 'sock' after this line, as another task might have allocated it */
@@ -819,6 +828,18 @@ lwip_accept4(int s, struct sockaddr *addr, socklen_t *addrlen, int flags)
LWIP_ASSERT("invalid socket index", (newsock >= LWIP_SOCKET_OFFSET) && (newsock < NUM_SOCKETS + LWIP_SOCKET_OFFSET));
#endif /* GAZELLE_ENABLE */
nsock = &sockets[newsock - LWIP_SOCKET_OFFSET];
+#if GAZELLE_ENABLE
+ struct tcp_pcb *pcb = newconn->pcb.tcp;
+ if (pcb->client_rx_ring != NULL && pcb->client_tx_ring != NULL) {
+ if (find_same_node_memzone(pcb, nsock) != 0) {
+ netconn_delete(newconn);
+ free_socket(nsock, 1);
+ sock_set_errno(sock, ENOTCONN);
+ done_socket(sock);
+ return -1;
+ }
+ }
+#endif
/* See event_callback: If data comes in right away after an accept, even
* though the server task might not have created a new socket yet.
diff --git a/src/core/ipv4/ip4_frag.c b/src/core/ipv4/ip4_frag.c
index dadf395..b2462d2 100644
--- a/src/core/ipv4/ip4_frag.c
+++ b/src/core/ipv4/ip4_frag.c
@@ -729,6 +729,7 @@ ip_frag_free_pbuf_custom_ref(struct pbuf_custom_ref *p)
/** Free-callback function to free a 'struct pbuf_custom_ref', called by
* pbuf_free. */
+#if !GAZELLE_ENABLE
static void
ipfrag_free_pbuf_custom(struct pbuf *p)
{
@@ -740,6 +741,7 @@ ipfrag_free_pbuf_custom(struct pbuf *p)
}
ip_frag_free_pbuf_custom_ref(pcr);
}
+#endif
#endif /* !LWIP_NETIF_TX_SINGLE_PBUF */
/**
@@ -851,7 +853,9 @@ ip4_frag(struct pbuf *p, struct netif *netif, const ip4_addr_t *dest)
}
pbuf_ref(p);
pcr->original = p;
+#if !GAZELLE_ENABLE
pcr->pc.custom_free_function = ipfrag_free_pbuf_custom;
+#endif
/* Add it to end of rambuf's chain, but using pbuf_cat, not pbuf_chain
* so that it is removed when pbuf_dechain is later called on rambuf.
diff --git a/src/core/netif.c b/src/core/netif.c
index ded3561..db3c718 100644
--- a/src/core/netif.c
+++ b/src/core/netif.c
@@ -1101,7 +1101,7 @@ netif_set_link_callback(struct netif *netif, netif_status_callback_fn link_callb
}
#endif /* LWIP_NETIF_LINK_CALLBACK */
-#if ENABLE_LOOPBACK
+#if !GAZELLE_ENABLE
/**
* @ingroup netif
* Send an IP packet to be received on the same netif (loopif-like).
@@ -1220,6 +1220,7 @@ netif_loop_output(struct netif *netif, struct pbuf *p)
return ERR_OK;
}
+#endif
#if LWIP_HAVE_LOOPIF
#if LWIP_IPV4
@@ -1241,7 +1242,7 @@ netif_loop_output_ipv6(struct netif *netif, struct pbuf *p, const ip6_addr_t *ad
#endif /* LWIP_IPV6 */
#endif /* LWIP_HAVE_LOOPIF */
-
+#if !GAZELLE_ENABLE
/**
* Call netif_poll() in the main loop of your application. This is to prevent
* reentering non-reentrant functions like tcp_input(). Packets passed to
@@ -1313,6 +1314,7 @@ netif_poll(struct netif *netif)
}
SYS_ARCH_UNPROTECT(lev);
}
+#endif
#if !LWIP_NETIF_LOOPBACK_MULTITHREADING
/**
@@ -1328,7 +1330,6 @@ netif_poll_all(void)
}
}
#endif /* !LWIP_NETIF_LOOPBACK_MULTITHREADING */
-#endif /* ENABLE_LOOPBACK */
#if LWIP_NUM_NETIF_CLIENT_DATA > 0
/**
diff --git a/src/core/pbuf.c b/src/core/pbuf.c
index 112baeb..ab1edff 100644
--- a/src/core/pbuf.c
+++ b/src/core/pbuf.c
@@ -69,6 +69,7 @@
*/
#include "lwip/opt.h"
+#include "lwipsock.h"
#include "lwip/pbuf.h"
#include "lwip/stats.h"
@@ -189,6 +190,7 @@ pbuf_init_alloced_pbuf(struct pbuf *p, void *payload, u16_t tot_len, u16_t len,
p->flags = flags;
p->ref = 1;
p->if_idx = NETIF_NO_INDEX;
+ p->pcb = NULL;
}
/**
@@ -779,9 +781,13 @@ pbuf_free(struct pbuf *p)
#if LWIP_SUPPORT_CUSTOM_PBUF
/* is this a custom pbuf? */
if ((p->flags & PBUF_FLAG_IS_CUSTOM) != 0) {
+#if GAZELLE_ENABLE
+ gazelle_free_pbuf(p);
+#else
struct pbuf_custom *pc = (struct pbuf_custom *)p;
LWIP_ASSERT("pc->custom_free_function != NULL", pc->custom_free_function != NULL);
pc->custom_free_function(p);
+#endif
} else
#endif /* LWIP_SUPPORT_CUSTOM_PBUF */
{
diff --git a/src/core/tcp.c b/src/core/tcp.c
index 9023dde..34e99df 100644
--- a/src/core/tcp.c
+++ b/src/core/tcp.c
@@ -116,6 +116,8 @@
#include <string.h>
#include <pthread.h>
+#include "lwipsock.h"
+
#ifdef LWIP_HOOK_FILENAME
#include LWIP_HOOK_FILENAME
#endif
@@ -250,6 +252,18 @@ void
tcp_free(struct tcp_pcb *pcb)
{
#if GAZELLE_ENABLE
+ if (pcb->free_ring == 1) {
+ struct netconn *netconn = NULL;
+ struct lwip_sock *sock = NULL;
+ rte_ring_free(pcb->client_rx_ring);
+ rte_ring_free(pcb->client_tx_ring);
+ netconn = (struct netconn *)pcb->callback_arg;
+ sock = get_socket(netconn->socket);
+ rte_memzone_free(sock->same_node_rx_ring->mz);
+ rte_memzone_free(sock->same_node_rx_ring_mz);
+ rte_memzone_free(sock->same_node_tx_ring->mz);
+ rte_memzone_free(sock->same_node_tx_ring_mz);
+ }
vdev_unreg_done(pcb);
release_port(pcb->local_port);
#endif
@@ -999,6 +1013,15 @@ tcp_listen_with_backlog_and_err(struct tcp_pcb *pcb, u8_t backlog, err_t *err)
/* pcb transfer to lpcb and reg into tcp_listen_pcbs. freeing pcb shouldn't release sock table in here.
* local_port=0 avoid to release sock table in tcp_free */
pcb->local_port = 0;
+
+ char name[RING_NAME_LEN];
+ snprintf(name, sizeof(name), "listen_rx_ring_%u", lpcb->local_port);
+ if (rte_ring_lookup(name) != NULL) {
+ /* port reuse */
+ lpcb->listen_rx_ring = NULL;
+ } else {
+ same_node_ring_create(&lpcb->listen_rx_ring, SAME_NODE_RING_SIZE, lpcb->local_port, "listen", "rx");
+ }
#endif
tcp_free(pcb);
#if LWIP_CALLBACK_API
@@ -1265,6 +1288,16 @@ tcp_connect(struct tcp_pcb *pcb, const ip_addr_t *ipaddr, u16_t port,
#endif /* SO_REUSE */
}
+#if GAZELLE_ENABLE
+ /* communication between processes on the same node */
+ if (ip_addr_cmp(&pcb->local_ip, &pcb->remote_ip)) {
+ ret = create_same_node_ring(pcb);
+ if (ret != 0) {
+ return ret;
+ }
+ }
+#endif
+
iss = tcp_next_iss(pcb);
pcb->rcv_nxt = 0;
pcb->snd_nxt = iss;
@@ -2095,6 +2128,11 @@ tcp_alloc(u8_t prio)
pcb->keep_intvl = TCP_KEEPINTVL_DEFAULT;
pcb->keep_cnt = TCP_KEEPCNT_DEFAULT;
#endif /* LWIP_TCP_KEEPALIVE */
+#if GAZELLE_ENABLE
+ pcb->client_rx_ring = NULL;
+ pcb->client_tx_ring = NULL;
+ pcb->free_ring = 0;
+#endif
pcb_tci_init(pcb);
}
return pcb;
diff --git a/src/core/tcp_in.c b/src/core/tcp_in.c
index 7126eed..a952903 100644
--- a/src/core/tcp_in.c
+++ b/src/core/tcp_in.c
@@ -42,6 +42,7 @@
*/
#include "lwip/opt.h"
+#include "lwipsock.h"
#if LWIP_TCP /* don't build if not configured for use in lwipopts.h */
@@ -809,6 +810,11 @@ tcp_listen_input(struct tcp_pcb_listen *pcb)
#if GAZELLE_ENABLE
vdev_reg_done(REG_RING_TCP_CONNECT, npcb);
+ if (ip_addr_cmp(&npcb->local_ip, &npcb->remote_ip)) {
+ if (find_same_node_ring(npcb) != 0) {
+ return;
+ }
+ }
#endif
/* Parse any options in the SYN. */
diff --git a/src/core/tcp_out.c b/src/core/tcp_out.c
index 17e495d..7aad1b8 100644
--- a/src/core/tcp_out.c
+++ b/src/core/tcp_out.c
@@ -728,6 +728,10 @@ tcp_write(struct tcp_pcb *pcb, const void *arg, u16_t len, u8_t apiflags)
goto memerr;
}
+#if GAZELLE_ENABLE
+ lstack_calculate_aggregate(2, p->tot_len);
+#endif
+
if ((seg = tcp_create_segment(pcb, p, 0, pcb->snd_lbb + pos, optflags)) == NULL) {
#if GAZELLE_ENABLE
if (pos > 0) {
@@ -1708,6 +1712,10 @@ tcp_output_segment(struct tcp_seg *seg, struct tcp_pcb *pcb, struct netif *netif
int seg_chksum_was_swapped = 0;
#endif
+#if USE_LIBOS
+ lstack_calculate_aggregate(1, seg->len);
+#endif
+
LWIP_ASSERT("tcp_output_segment: invalid seg", seg != NULL);
LWIP_ASSERT("tcp_output_segment: invalid pcb", pcb != NULL);
LWIP_ASSERT("tcp_output_segment: invalid netif", netif != NULL);
@@ -1902,6 +1910,8 @@ tcp_output_segment(struct tcp_seg *seg, struct tcp_pcb *pcb, struct netif *netif
PERF_START(PERF_LAYER_IP, PERF_POINT_IP_SEND);
NETIF_SET_HINTS(netif, &(pcb->netif_hints));
+
+ seg->p->pcb = pcb;
err = ip_output_if(seg->p, &pcb->local_ip, &pcb->remote_ip, pcb->ttl,
pcb->tos, IP_PROTO_TCP, netif);
NETIF_RESET_HINTS(netif);
@@ -2238,6 +2248,7 @@ tcp_output_control_segment(struct tcp_pcb *pcb, struct pbuf *p,
{
struct netif *netif;
+ p->pcb = pcb;
LWIP_ASSERT("tcp_output_control_segment: invalid pbuf", p != NULL);
if (pcb == NULL || pcb->pcb_if == NULL) {
diff --git a/src/include/lwip/pbuf.h b/src/include/lwip/pbuf.h
index ae8e5e7..46ff26a 100644
--- a/src/include/lwip/pbuf.h
+++ b/src/include/lwip/pbuf.h
@@ -235,6 +235,7 @@ struct pbuf {
u8_t head;
struct pbuf *last;
pthread_spinlock_t pbuf_lock;
+ struct tcp_pcb *pcb;
#endif /* GAZELLE_ENABLE CHECKSUM_OFFLOAD_SWITCH */
/** In case the user needs to store data custom data on a pbuf */
@@ -263,7 +264,9 @@ struct pbuf_custom {
/** The actual pbuf */
struct pbuf pbuf;
/** This function is called when pbuf_free deallocates this pbuf(_custom) */
+#if !GAZELLE_ENABLE
pbuf_free_custom_fn custom_free_function;
+#endif
};
#endif /* LWIP_SUPPORT_CUSTOM_PBUF */
diff --git a/src/include/lwip/tcp.h b/src/include/lwip/tcp.h
index edfdb68..f968441 100644
--- a/src/include/lwip/tcp.h
+++ b/src/include/lwip/tcp.h
@@ -260,6 +260,9 @@ struct tcp_pcb_listen {
u8_t master_lpcb;
#endif
+#if GAZELLE_ENABLE
+ struct rte_ring *listen_rx_ring;
+#endif
};
@@ -417,6 +420,13 @@ struct tcp_pcb {
u8_t rcv_scale;
#endif
+#if GAZELLE_ENABLE
+#define SAME_NODE_RING_SIZE 512
+ struct rte_ring *client_rx_ring;
+ struct rte_ring *client_tx_ring;
+ u8_t free_ring;
+#endif
+
u8_t need_tso_send;
};
diff --git a/src/include/lwipopts.h b/src/include/lwipopts.h
index 9f8c923..5b6bf6e 100644
--- a/src/include/lwipopts.h
+++ b/src/include/lwipopts.h
@@ -227,4 +227,11 @@
#define SIOCSHIWAT 1
+/*
+ ------------------------------------
+ ---------- Netif options ----------
+ ------------------------------------
+*/
+#define LWIP_NETIF_LOOPBACK 1
+
#endif /* __LWIPOPTS_H__ */
diff --git a/src/include/lwipsock.h b/src/include/lwipsock.h
index 7e16ec8..f917d8a 100644
--- a/src/include/lwipsock.h
+++ b/src/include/lwipsock.h
@@ -65,7 +65,19 @@ struct protocol_stack;
struct wakeup_poll;
struct rte_ring;
#include <rte_common.h>
+#include <rte_memzone.h>
+
+// 8M
+#define SAME_NODE_RING_LEN (unsigned long long)(8388608)
+#define SAME_NODE_RING_MASK (unsigned long long)(8388608 - 1)
+#define RING_NAME_LEN 32
+struct same_node_ring {
+ const struct rte_memzone *mz;
+ unsigned long long sndbegin;
+ unsigned long long sndend;
+};
#endif
+
/** Contains all internal pointers and states used for a socket */
struct lwip_sock {
/** sockets currently are built on netconns, each socket has one netconn */
@@ -120,9 +132,25 @@ struct lwip_sock {
struct protocol_stack *stack;
struct rte_ring *recv_ring;
struct rte_ring *send_ring;
+
+ /* same node send data ring */
+ struct same_node_ring *same_node_rx_ring;
+ const struct rte_memzone *same_node_rx_ring_mz;
+ struct same_node_ring *same_node_tx_ring;
+ const struct rte_memzone *same_node_tx_ring_mz;
#endif
};
+#if GAZELLE_ENABLE
+static inline unsigned same_node_ring_count(struct lwip_sock *sock)
+{
+ const unsigned long long cur_begin = __atomic_load_n(&sock->same_node_rx_ring->sndbegin, __ATOMIC_RELAXED);
+ const unsigned long long cur_end = __atomic_load_n(&sock->same_node_rx_ring->sndend, __ATOMIC_RELAXED);
+
+ return cur_end - cur_begin;
+}
+#endif
+
#ifndef set_errno
#define set_errno(err) do { if (err) { errno = (err); } } while(0)
#endif
@@ -142,6 +170,15 @@ extern struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size
extern void gazelle_init_sock(int32_t fd);
extern void gazelle_clean_sock(int32_t fd);
extern void write_lwip_over(struct lwip_sock *sock);
+extern void netif_poll(struct netif *netif);
+extern err_t netif_loop_output(struct netif *netif, struct pbuf *p);
+extern err_t find_same_node_memzone(struct tcp_pcb *pcb, struct lwip_sock *nsock);
+extern err_t same_node_memzone_create(const struct rte_memzone **zone, int size, int port, char *name, char *);
+extern err_t same_node_ring_create(struct rte_ring **ring, int size, int port, char *name, char *rx);
+extern err_t create_same_node_ring(struct tcp_pcb *pcb);
+extern err_t find_same_node_ring(struct tcp_pcb *pcb);
+extern void gazelle_free_pbuf(struct pbuf *pbuf);
+extern void lstack_calculate_aggregate(int type, uint32_t len);
#endif /* GAZELLE_ENABLE */
struct lwip_sock *get_socket(int s);
--
2.23.0
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。