代码拉取完成,页面将自动刷新
同步操作将从 compile_success/gazelle_kylin_src 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
From f0e65d55ace8b4e5c1bd2023f1b62da181f421c5 Mon Sep 17 00:00:00 2001
From: jiangheng14 <jiangheng14@huawei.com>
Date: Thu, 7 Jul 2022 21:31:20 +0800
Subject: [PATCH 6/6] refactor-pkt-read-send-performance
---
src/common/dpdk_common.h | 145 +++++++
src/common/gazelle_dfx_msg.h | 50 ++-
src/lstack/api/lstack_epoll.c | 321 ++++++--------
src/lstack/api/lstack_wrap.c | 15 +-
src/lstack/core/lstack_cfg.c | 18 +
src/lstack/core/lstack_control_plane.c | 4 +-
src/lstack/core/lstack_dpdk.c | 32 +-
src/lstack/core/lstack_init.c | 5 +-
src/lstack/core/lstack_lwip.c | 477 ++++++++++-----------
src/lstack/core/lstack_protocol_stack.c | 276 ++++++------
src/lstack/core/lstack_stack_stat.c | 80 +++-
src/lstack/core/lstack_thread_rpc.c | 85 ++--
src/lstack/include/lstack_cfg.h | 1 +
src/lstack/include/lstack_dpdk.h | 5 +-
src/lstack/include/lstack_lockless_queue.h | 10 +-
src/lstack/include/lstack_lwip.h | 11 +-
src/lstack/include/lstack_protocol_stack.h | 31 +-
src/lstack/include/lstack_stack_stat.h | 3 +
src/lstack/include/lstack_thread_rpc.h | 5 +-
src/lstack/include/posix/lstack_epoll.h | 22 +-
src/lstack/lstack.conf | 1 +
src/lstack/netif/lstack_ethdev.c | 9 +-
src/lstack/netif/lstack_vdev.c | 14 +-
src/ltran/ltran_dfx.c | 52 ++-
src/ltran/ltran_forward.c | 18 +-
src/ltran/ltran_stat.c | 19 +-
26 files changed, 911 insertions(+), 798 deletions(-)
diff --git a/src/common/dpdk_common.h b/src/common/dpdk_common.h
index 595e85f..4a7bd37 100644
--- a/src/common/dpdk_common.h
+++ b/src/common/dpdk_common.h
@@ -14,6 +14,7 @@
#define __GAZELLE_DPDK_COMMON_H__
#include <rte_mbuf.h>
+#include <rte_ring.h>
#define GAZELLE_KNI_NAME "kni" // will be removed during dpdk update
@@ -35,6 +36,7 @@ static __rte_always_inline void copy_mbuf(struct rte_mbuf *dst, struct rte_mbuf
return;
dst->ol_flags = src->ol_flags;
+ dst->tx_offload = src->tx_offload;
// there is buf_len in rx_descriptor_fields1, copy it is dangerous acturely. 16 : mbuf desc size
rte_memcpy((uint8_t *)dst->rx_descriptor_fields1, (const uint8_t *)src->rx_descriptor_fields1, 16);
@@ -65,4 +67,147 @@ int32_t dpdk_kni_init(uint16_t port, struct rte_mempool *pool);
int32_t kni_process_tx(struct rte_mbuf **pkts_burst, uint32_t count);
void kni_process_rx(uint16_t port);
+/*
+ gazelle custom rte ring interface
+ lightweight ring reduce atomic and smp_mb.
+ only surpport single-consumers or the single-consumer.
+ */
+static __rte_always_inline uint32_t gazelle_light_ring_enqueue_busrt(struct rte_ring *r, void **obj_table, uint32_t n)
+{
+ uint32_t cons = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE);
+ uint32_t prod = r->prod.tail;
+ uint32_t free_entries = r->capacity + cons - prod;
+
+ if (n > free_entries) {
+ return 0;
+ }
+
+ __rte_ring_enqueue_elems(r, prod, obj_table, sizeof(void *), n);
+
+ __atomic_store_n(&r->prod.tail, prod + n, __ATOMIC_RELEASE);
+
+ return n;
+}
+
+static __rte_always_inline uint32_t gazelle_light_ring_dequeue_burst(struct rte_ring *r, void **obj_table, uint32_t n)
+{
+ uint32_t prod = __atomic_load_n(&r->prod.tail, __ATOMIC_ACQUIRE);
+ uint32_t cons = r->cons.tail;
+ uint32_t entries = prod - cons;
+
+ if (n > entries) {
+ n = entries;
+ }
+
+ if (n == 0) {
+ return 0;
+ }
+
+ __rte_ring_dequeue_elems(r, cons, obj_table, sizeof(void *), n);
+
+ __atomic_store_n(&r->cons.tail, cons + n, __ATOMIC_RELEASE);
+
+ return n;
+}
+
+/*
+ gazelle custom rte ring interface
+ one thread enqueue and dequeue, other thread read object use and object still in queue.
+ so malloc and free in same thread. only surpport single-consumers or the single-consumer.
+
+ cons.tail prod.tail prod.head cons.head
+ gazelle_ring_sp_enqueue: cons.head-->> cons.tal, enqueue object
+ gazelle_ring_sc_dequeue: cons.tal -->> prod.tail, dequeue object
+ gazelle_ring_read: prod.tail-->> cons.head, read object, prod.head = prod.tail + N
+ gazelle_ring_read_over: prod.tail = prod.head, update prod.tail
+ */
+static __rte_always_inline uint32_t gazelle_ring_sp_enqueue(struct rte_ring *r, void **obj_table, uint32_t n)
+{
+ uint32_t head = __atomic_load_n(&r->cons.head, __ATOMIC_ACQUIRE);
+ uint32_t tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE);
+
+ uint32_t entries = r->capacity + tail - head;
+ if (n > entries) {
+ return 0;
+ }
+
+
+ __rte_ring_enqueue_elems(r, head, obj_table, sizeof(void *), n);
+
+ __atomic_store_n(&r->cons.head, head + n, __ATOMIC_RELEASE);
+
+ return n;
+}
+
+static __rte_always_inline uint32_t gazelle_ring_sc_dequeue(struct rte_ring *r, void **obj_table, uint32_t n)
+{
+ uint32_t cons = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE);
+ uint32_t prod = __atomic_load_n(&r->prod.tail, __ATOMIC_ACQUIRE);
+
+ uint32_t entries = prod - cons;
+ if (n > entries) {
+ n = entries;
+ }
+ if (unlikely(n == 0)) {
+ return 0;
+ }
+
+
+ __rte_ring_dequeue_elems(r, cons, obj_table, sizeof(void *), n);
+
+ __atomic_store_n(&r->cons.tail, cons + n, __ATOMIC_RELEASE);
+
+ return n;
+}
+
+static __rte_always_inline uint32_t gazelle_ring_read(struct rte_ring *r, void **obj_table, uint32_t n)
+{
+ uint32_t cons = __atomic_load_n(&r->cons.head, __ATOMIC_ACQUIRE);
+ uint32_t prod = r->prod.head;
+
+ const uint32_t entries = cons - prod;
+ if (n > entries) {
+ n = entries;
+ }
+ if (unlikely(n == 0)) {
+ return 0;
+ }
+
+ __rte_ring_dequeue_elems(r, prod, obj_table, sizeof(void *), n);
+
+ r->prod.head = prod + n;
+
+ return n;
+}
+
+static __rte_always_inline void gazelle_ring_read_n(struct rte_ring *r, uint32_t n)
+{
+ __atomic_store_n(&r->prod.tail, r->prod.tail + n, __ATOMIC_RELEASE);
+}
+
+static __rte_always_inline void gazelle_ring_read_over(struct rte_ring *r)
+{
+ __atomic_store_n(&r->prod.tail, r->prod.head, __ATOMIC_RELEASE);
+}
+
+static __rte_always_inline uint32_t gazelle_ring_readover_count(struct rte_ring *r)
+{
+ rte_smp_rmb();
+ return r->prod.tail - r->cons.tail;
+}
+static __rte_always_inline uint32_t gazelle_ring_readable_count(const struct rte_ring *r)
+{
+ rte_smp_rmb();
+ return r->cons.head - r->prod.tail;
+}
+
+static __rte_always_inline uint32_t gazelle_ring_count(const struct rte_ring *r)
+{
+ rte_smp_rmb();
+ return r->cons.head - r->cons.tail;
+}
+static __rte_always_inline uint32_t gazelle_ring_free_count(const struct rte_ring *r)
+{
+ return r->capacity - gazelle_ring_count(r);
+}
#endif
diff --git a/src/common/gazelle_dfx_msg.h b/src/common/gazelle_dfx_msg.h
index 6db67ee..cf435cd 100644
--- a/src/common/gazelle_dfx_msg.h
+++ b/src/common/gazelle_dfx_msg.h
@@ -57,34 +57,37 @@ enum GAZELLE_LATENCY_TYPE {
GAZELLE_LATENCY_READ,
};
-struct gazelle_stat_pkts {
- uint64_t tx;
- uint64_t rx;
- uint64_t tx_drop;
- uint64_t rx_drop;
- uint64_t rx_allocmbuf_fail;
- uint64_t tx_allocmbuf_fail;
- uint64_t call_msg_cnt;
- uint16_t conn_num;
- uint16_t send_idle_ring_cnt;
- uint64_t event_list;
+struct gazelle_stack_stat {
+ uint64_t wakeup_events;
+ uint64_t write_lwip_cnt;
+ uint64_t send_self_rpc;
uint64_t read_lwip_drop;
uint64_t read_lwip_cnt;
- uint64_t write_lwip_drop;
- uint64_t write_lwip_cnt;
+ uint64_t rx_allocmbuf_fail;
+ uint64_t tx_allocmbuf_fail;
+ uint64_t call_null;
+ uint64_t rx_drop;
+ uint64_t rx;
+ uint64_t tx_drop;
+ uint64_t tx;
+};
+
+struct gazelle_wakeup_stat {
+ uint64_t app_events;
+ uint64_t app_write_idlefail;
uint64_t app_write_cnt;
uint64_t app_read_cnt;
- uint64_t app_write_idlefail;
- uint64_t app_write_drop;
- uint64_t recv_list;
- uint64_t wakeup_events;
- uint64_t app_events;
- uint64_t call_alloc_fail;
uint64_t read_null;
- uint64_t call_null;
- uint64_t arp_copy_fail;
- uint64_t send_self_rpc;
- uint64_t send_list;
+};
+
+struct gazelle_stat_pkts {
+ uint64_t call_msg_cnt;
+ uint16_t conn_num;
+ uint64_t recv_list_cnt;
+ uint64_t call_alloc_fail;
+ uint64_t send_list_cnt;
+ struct gazelle_stack_stat stack_stat;
+ struct gazelle_wakeup_stat wakeup_stat;
};
/* same as define in lwip/stats.h - struct stats_mib2 */
@@ -159,6 +162,7 @@ struct gazelle_stat_lstack_conn_info {
uint32_t recv_ring_cnt;
uint32_t tcp_sub_state;
int32_t sem_cnt;
+ int32_t fd;
};
struct gazelle_stat_lstack_conn {
diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c
index cba67ea..4978f02 100644
--- a/src/lstack/api/lstack_epoll.c
+++ b/src/lstack/api/lstack_epoll.c
@@ -16,6 +16,7 @@
#include <time.h>
#include <poll.h>
#include <stdatomic.h>
+#include <pthread.h>
#include <lwip/lwipsock.h>
#include <lwip/sockets.h>
@@ -39,36 +40,33 @@
#define SEC_TO_NSEC 1000000000
#define SEC_TO_MSEC 1000
#define MSEC_TO_NSEC 1000000
-#define EPOLL_MAX_EVENTS 512
#define POLL_KERNEL_EVENTS 32
-static PER_THREAD struct wakeup_poll g_wakeup_poll = {0};
-static bool g_use_epoll = false; /* FIXME: when no epoll close prepare event for performance testing */
-
void add_epoll_event(struct netconn *conn, uint32_t event)
{
/* conn sock nerver null, because lwip call this func */
- struct lwip_sock *sock = get_socket(conn->socket);
-
- if ((event & sock->epoll_events) == 0) {
+ struct lwip_sock *sock = get_socket_by_fd(conn->socket);
+ if (sock->wakeup == NULL || (event & sock->epoll_events) == 0) {
return;
}
+ struct wakeup_poll *wakeup = sock->wakeup;
+ struct protocol_stack *stack = sock->stack;
- sock->events |= event & sock->epoll_events;
-
-#ifdef GAZELLE_USE_EPOLL_EVENT_STACK
- if (g_use_epoll && list_is_empty(&sock->event_list)) {
- list_add_node(&sock->stack->event_list, &sock->event_list);
+ if (wakeup->type == WAKEUP_EPOLL) {
+ pthread_spin_lock(&wakeup->event_list_lock);
+ sock->events |= (event == EPOLLERR) ? (EPOLLIN | EPOLLERR) : (event & sock->epoll_events);
+ if (list_is_null(&sock->event_list)) {
+ list_add_node(&wakeup->event_list, &sock->event_list);
+ }
+ pthread_spin_unlock(&wakeup->event_list_lock);
}
-#endif
- if (sock->wakeup) {
- sock->stack->stats.wakeup_events++;
- if (get_protocol_stack_group()->wakeup_enable) {
- rte_ring_sp_enqueue(sock->stack->wakeup_ring, &sock->wakeup->event_sem);
- } else {
- sem_post(&sock->wakeup->event_sem);
- }
+ stack->stats.wakeup_events++;
+ sem_t *sem = &wakeup->event_sem;
+ if (get_protocol_stack_group()->wakeup_enable) {
+ gazelle_light_ring_enqueue_busrt(stack->wakeup_ring, (void **)&sem, 1);
+ } else {
+ sem_post(sem);
}
}
@@ -77,61 +75,34 @@ static inline uint32_t update_events(struct lwip_sock *sock)
uint32_t event = 0;
if (sock->epoll_events & EPOLLIN) {
- if (sock->attach_fd > 0 && NETCONN_IS_ACCEPTIN(sock)) {
- event |= EPOLLIN;
- }
-
- if (sock->attach_fd < 0 && NETCONN_IS_DATAIN(sock)) {
+ if (NETCONN_IS_DATAIN(sock) || NETCONN_IS_ACCEPTIN(sock)) {
event |= EPOLLIN;
}
}
if ((sock->epoll_events & EPOLLOUT) && NETCONN_IS_OUTIDLE(sock)) {
- event |= EPOLLOUT;
+ /* lwip_netconn_do_connected set LIBOS FLAGS when connected */
+ if (sock->conn && CONN_TYPE_IS_LIBOS(sock->conn)) {
+ event |= EPOLLOUT;
+ }
}
- if ((sock->epoll_events & EPOLLERR) && (sock->events & EPOLLERR)) {
+ if (sock->errevent > 0) {
event |= EPOLLERR | EPOLLIN;
}
return event;
}
-#ifdef GAZELLE_USE_EPOLL_EVENT_STACK
-void update_stack_events(struct protocol_stack *stack)
+static void raise_pending_events(struct wakeup_poll *wakeup, struct lwip_sock *sock)
{
- if (!g_use_epoll) {
- return;
- }
-
- struct list_node *node, *temp;
- list_for_each_safe(node, temp, &stack->event_list) {
- struct lwip_sock *sock = container_of(node, struct lwip_sock, event_list);
-
- sock->events = update_events(sock);
- if (sock->events != 0) {
- continue;
- }
-
- if (pthread_spin_trylock(&stack->event_lock)) {
- continue;
+ sock->events = update_events(sock);
+ if (sock->events) {
+ pthread_spin_lock(&wakeup->event_list_lock);
+ if (list_is_null(&sock->event_list)) {
+ list_add_node(&wakeup->event_list, &sock->event_list);
}
- list_del_node_init(&sock->event_list);
- pthread_spin_unlock(&stack->event_lock);
- }
-}
-#endif
-
-static void raise_pending_events(struct lwip_sock *sock)
-{
- struct lwip_sock *attach_sock = (sock->attach_fd > 0) ? get_socket_by_fd(sock->attach_fd) : sock;
- if (attach_sock == NULL) {
- return;
- }
-
- attach_sock->events = update_events(attach_sock);
- if (attach_sock->events & attach_sock->epoll_events) {
- rpc_call_addevent(attach_sock->stack, attach_sock);
+ pthread_spin_unlock(&wakeup->event_list_lock);
}
}
@@ -157,11 +128,15 @@ int32_t lstack_epoll_create(int32_t size)
memset_s(wakeup, sizeof(struct wakeup_poll), 0, sizeof(struct wakeup_poll));
init_list_node(&wakeup->event_list);
- wakeup->epollfd = fd;
sem_init(&wakeup->event_sem, 0, 0);
+ pthread_spin_init(&wakeup->event_list_lock, PTHREAD_PROCESS_PRIVATE);
+
+ wakeup->type = WAKEUP_EPOLL;
+ wakeup->epollfd = fd;
sock->wakeup = wakeup;
- g_use_epoll = true;
+ register_wakeup(wakeup);
+
return fd;
}
@@ -176,6 +151,9 @@ int32_t lstack_epoll_close(int32_t fd)
}
if (sock->wakeup) {
+ unregister_wakeup(sock->wakeup);
+ sem_destroy(&sock->wakeup->event_sem);
+ pthread_spin_destroy(&sock->wakeup->event_list_lock);
free(sock->wakeup);
}
sock->wakeup = NULL;
@@ -236,161 +214,116 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
struct lwip_sock *sock = get_socket(fd);
if (sock == NULL) {
- epoll_sock->wakeup->have_kernel_fd = true;
return posix_api->epoll_ctl_fn(epfd, op, fd, event);
}
if (CONN_TYPE_HAS_HOST(sock->conn)) {
- epoll_sock->wakeup->have_kernel_fd = true;
int32_t ret = posix_api->epoll_ctl_fn(epfd, op, fd, event);
if (ret < 0) {
- return ret;
+ LSTACK_LOG(ERR, LSTACK, "fd=%d epfd=%d op=%d\n", fd, epfd, op);
}
}
+ struct wakeup_poll *wakeup = epoll_sock->wakeup;
do {
switch (op) {
case EPOLL_CTL_ADD:
- sock->wakeup = epoll_sock->wakeup;
- if (sock->stack) {
- epoll_sock->wakeup->stack_fd_cnt[sock->stack->queue_id]++;
- }
- if (list_is_empty(&sock->event_list)) {
- list_add_node(&sock->wakeup->event_list, &sock->event_list);
- }
+ sock->wakeup = wakeup;
+ wakeup->stack_fd_cnt[sock->stack->queue_id]++;
/* fall through */
case EPOLL_CTL_MOD:
sock->epoll_events = event->events | EPOLLERR | EPOLLHUP;
sock->ep_data = event->data;
- if (sock->conn && NETCONNTYPE_GROUP(netconn_type(sock->conn)) == NETCONN_TCP) {
- raise_pending_events(sock);
- }
+ raise_pending_events(wakeup, sock);
break;
case EPOLL_CTL_DEL:
- list_del_node_init(&sock->event_list);
sock->epoll_events = 0;
- if (sock->stack) {
- epoll_sock->wakeup->stack_fd_cnt[sock->stack->queue_id]--;
- }
+ wakeup->stack_fd_cnt[sock->stack->queue_id]--;
+ pthread_spin_lock(&wakeup->event_list_lock);
+ list_del_node_null(&sock->event_list);
+ pthread_spin_unlock(&wakeup->event_list_lock);
break;
default:
GAZELLE_RETURN(EINVAL);
}
- fd = sock->nextfd;
- sock = get_socket(fd);
- } while (fd > 0 && sock != NULL);
+ sock = sock->listen_next;
+ } while (sock != NULL);
- update_epoll_max_stack(epoll_sock->wakeup);
+ update_epoll_max_stack(wakeup);
return 0;
}
-#ifdef GAZELLE_USE_EPOLL_EVENT_STACK
-static int32_t epoll_lwip_event(struct wakeup_poll *wakeup, struct epoll_event *events, uint32_t maxevents)
+static void del_node_array(struct epoll_event *events, int32_t event_num, int32_t del_index)
{
- int32_t event_num = 0;
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
-
- maxevents = LWIP_MIN(EPOLL_MAX_EVENTS, maxevents);
- for (uint32_t i = 0; i < stack_group->stack_num && event_num < maxevents; i++) {
- struct protocol_stack *stack = stack_group->stacks[i];
- int32_t start_event_num = event_num;
-
- if (pthread_spin_trylock(&stack->event_lock)) {
- continue;
- }
-
- struct list_node *node, *temp;
- list_for_each_safe(node, temp, &stack->event_list) {
- struct lwip_sock *sock = container_of(node, struct lwip_sock, event_list);
-
- uint32_t event = sock->events & sock->epoll_events;
- if (event == 0 || sock->wait_close) {
- continue;
- }
-
- events[event_num].events = event;
- events[event_num].data = sock->ep_data;
- event_num++;
+ for (int32_t i = del_index; i + 1 < event_num; i++) {
+ events[i] = events[i + 1];
+ }
+}
- if (event_num >= maxevents) {
- break;
+static int32_t del_duplicate_event(struct epoll_event *events, int32_t event_num)
+{
+ for (int32_t i = 0; i < event_num; i++) {
+ for (int32_t j = i + 1; j < event_num; j++) {
+ if (events[i].data.u64 == events[j].data.u64) {
+ del_node_array(events, event_num, j);
+ event_num--;
}
}
-
- pthread_spin_unlock(&stack->event_lock);
-
- __sync_fetch_and_add(&stack->stats.app_events, event_num - start_event_num);
}
return event_num;
}
-#else
+
static int32_t epoll_lwip_event(struct wakeup_poll *wakeup, struct epoll_event *events, uint32_t maxevents)
{
int32_t event_num = 0;
struct list_node *node, *temp;
+ int32_t accept_num = 0;
+
+ pthread_spin_lock(&wakeup->event_list_lock);
+
list_for_each_safe(node, temp, &wakeup->event_list) {
struct lwip_sock *sock = container_of(node, struct lwip_sock, event_list);
- if (sock->conn == NULL) {
- list_del_node_init(&sock->event_list);
- continue;
+ if (sock->conn && sock->conn->acceptmbox) {
+ accept_num++;
}
- struct lwip_sock *temp_sock = sock;
- do {
- struct lwip_sock *attach_sock = (temp_sock->attach_fd > 0) ? get_socket(temp_sock->attach_fd) : temp_sock;
- if (attach_sock == NULL || temp_sock->wait_close) {
- temp_sock = (temp_sock->nextfd > 0) ? get_socket(temp_sock->nextfd) : NULL;
- continue;
- }
+ events[event_num].events = sock->events;
+ events[event_num].data = sock->ep_data;
+ event_num++;
- uint32_t event = update_events(attach_sock);
- if (event != 0) {
- events[event_num].events = event;
- events[event_num].data = temp_sock->ep_data;
- event_num++;
- if (event_num >= maxevents) {
- break;
- }
- }
+ if (event_num >= maxevents) {
+ break;
+ }
+ }
- temp_sock = (temp_sock->nextfd > 0) ? get_socket(temp_sock->nextfd) : NULL;
- } while (temp_sock);
+ pthread_spin_unlock(&wakeup->event_list_lock);
+
+ if (accept_num > 1) {
+ event_num = del_duplicate_event(events, event_num);
}
+ // atomic_fetch_add(&wakeup->bind_stack->stats.app_events, event_num);
return event_num;
}
-#endif
static int32_t poll_lwip_event(struct pollfd *fds, nfds_t nfds)
{
int32_t event_num = 0;
for (uint32_t i = 0; i < nfds; i++) {
- /* listenfd nextfd pointerto next stack listen, others nextfd=-1 */
+ /* sock->listen_next pointerto next stack listen */
int32_t fd = fds[i].fd;
- while (fd > 0) {
- struct lwip_sock *sock = get_socket(fd);
- if (sock == NULL) {
- break;
- }
-
- /* attach listen is empty, all event in attached listen. attached listen attach_fd is self */
- struct lwip_sock *attach_sock = (sock->attach_fd > 0) ? get_socket(sock->attach_fd) : sock;
- if (attach_sock == NULL || sock->wait_close) {
- fd = sock->nextfd;
- continue;
- }
-
- uint32_t events = update_events(attach_sock);
+ struct lwip_sock *sock = get_socket_by_fd(fd);
+ while (sock && sock->conn) {
+ uint32_t events = update_events(sock);
if (events) {
fds[i].revents = events;
- __sync_fetch_and_add(&sock->stack->stats.app_events, 1);
event_num++;
break;
}
- fd = sock->nextfd;
+ sock = sock->listen_next;;
}
}
@@ -417,7 +350,7 @@ static void change_epollfd_kernel_thread(struct wakeup_poll *wakeup, struct prot
/* avoid kernel thread post too much, use EPOLLET */
struct epoll_event event;
- event.data.ptr = &wakeup->event_sem;
+ event.data.ptr = wakeup;
event.events = EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLET;
if (posix_api->epoll_ctl_fn(new_stack->epollfd, EPOLL_CTL_ADD, wakeup->epollfd, &event) != 0) {
LSTACK_LOG(ERR, LSTACK, "epoll_ctl_fn errno=%d\n", errno);
@@ -457,15 +390,18 @@ int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event* events, int32_t maxe
do {
event_num += epoll_lwip_event(sock->wakeup, &events[event_num], maxevents - event_num);
+ sock->wakeup->stat.app_events += event_num;
- if (sock->wakeup->have_kernel_fd) {
+ if (__atomic_load_n(&sock->wakeup->have_kernel_event, __ATOMIC_RELAXED)) {
event_num += posix_api->epoll_wait_fn(epfd, &events[event_num], maxevents - event_num, 0);
}
if (event_num > 0) {
+ while (sem_trywait(&sock->wakeup->event_sem) == 0);
break;
}
+ sock->wakeup->have_kernel_event = false;
if (timeout < 0) {
ret = sem_wait(&sock->wakeup->event_sem);
} else {
@@ -479,6 +415,7 @@ int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event* events, int32_t maxe
static void init_poll_wakeup_data(struct wakeup_poll *wakeup)
{
sem_init(&wakeup->event_sem, 0, 0);
+ wakeup->type = WAKEUP_POLL;
wakeup->last_fds = calloc(POLL_KERNEL_EVENTS, sizeof(struct pollfd));
if (wakeup->last_fds == NULL) {
@@ -542,11 +479,6 @@ static void update_kernel_poll(struct wakeup_poll *wakeup, uint32_t index, struc
if (posix_api->epoll_ctl_fn(wakeup->epollfd, EPOLL_CTL_ADD, new_fd->fd, &event) != 0) {
LSTACK_LOG(ERR, LSTACK, "epoll_ctl_fn errno=%d\n", errno);
}
-
- wakeup->last_fds[index].fd = new_fd->fd;
- wakeup->last_fds[index].events = new_fd->events;
-
- wakeup->have_kernel_fd = true;
}
static void poll_init(struct wakeup_poll *wakeup, struct pollfd *fds, nfds_t nfds)
@@ -554,17 +486,17 @@ static void poll_init(struct wakeup_poll *wakeup, struct pollfd *fds, nfds_t nfd
if (!wakeup->init) {
wakeup->init = true;
init_poll_wakeup_data(wakeup);
- } else {
- while (sem_trywait(&wakeup->event_sem) == 0) {}
- }
-
- if (nfds > wakeup->last_max_nfds) {
- resize_kernel_poll(wakeup, nfds);
+ register_wakeup(wakeup);
}
int32_t stack_count[PROTOCOL_STACK_MAX] = {0};
int32_t poll_change = 0;
+ /* poll fds num more, recalloc fds size */
+ if (nfds > wakeup->last_max_nfds) {
+ resize_kernel_poll(wakeup, nfds);
+ poll_change = 1;
+ }
/* poll fds num less, del old fd */
for (uint32_t i = nfds; i < wakeup->last_nfds; i++) {
update_kernel_poll(wakeup, i, NULL);
@@ -572,44 +504,51 @@ static void poll_init(struct wakeup_poll *wakeup, struct pollfd *fds, nfds_t nfd
}
for (uint32_t i = 0; i < nfds; i++) {
+ int32_t fd = fds[i].fd;
fds[i].revents = 0;
+ struct lwip_sock *sock = get_socket_by_fd(fd);
- if (fds[i].fd == wakeup->last_fds[i].fd && fds[i].events == wakeup->last_fds[i].events) {
- continue;
+ if (fd == wakeup->last_fds[i].fd && fds[i].events == wakeup->last_fds[i].events) {
+ /* fd close then socket may get same fd. */
+ if (sock == NULL || sock->wakeup != NULL) {
+ continue;
+ }
}
+ wakeup->last_fds[i].fd = fd;
+ wakeup->last_fds[i].events = fds[i].events;
poll_change = 1;
- int32_t fd = fds[i].fd;
- struct lwip_sock *sock = get_socket(fd);
- if (sock == NULL || CONN_TYPE_HAS_HOST(sock->conn)) {
+ if (sock == NULL || sock->conn == NULL || CONN_TYPE_HAS_HOST(sock->conn)) {
update_kernel_poll(wakeup, i, fds + i);
}
- do {
- sock = get_socket(fd);
- if (sock == NULL || sock->conn == NULL) {
- break;
+ while (sock && sock->conn) {
+ if (sock->epoll_events != (fds[i].events | POLLERR)) {
+ sock->epoll_events = fds[i].events | POLLERR;
+ }
+ if (sock->wakeup != wakeup) {
+ sock->wakeup = wakeup;
}
- sock->epoll_events = fds[i].events | POLLERR;
- sock->wakeup = wakeup;
- /* listenfd list */
- fd = sock->nextfd;
stack_count[sock->stack->queue_id]++;
- } while (fd > 0);
+ /* listenfd list */
+ sock = sock->listen_next;
+ }
}
- wakeup->last_nfds = nfds;
if (poll_change == 0) {
return;
}
+ wakeup->last_nfds = nfds;
poll_bind_statck(wakeup, stack_count);
}
int32_t lstack_poll(struct pollfd *fds, nfds_t nfds, int32_t timeout)
{
- poll_init(&g_wakeup_poll, fds, nfds);
+ static PER_THREAD struct wakeup_poll wakeup_poll = {0};
+
+ poll_init(&wakeup_poll, fds, nfds);
int32_t event_num = 0;
int32_t ret;
@@ -624,23 +563,25 @@ int32_t lstack_poll(struct pollfd *fds, nfds_t nfds, int32_t timeout)
event_num += poll_lwip_event(fds, nfds);
/* reduce syscall epoll_wait */
- if (g_wakeup_poll.have_kernel_fd) {
- int32_t kernel_num = posix_api->epoll_wait_fn(g_wakeup_poll.epollfd, g_wakeup_poll.events, nfds, 0);
+ if (__atomic_load_n(&wakeup_poll.have_kernel_event, __ATOMIC_RELAXED)) {
+ int32_t kernel_num = posix_api->epoll_wait_fn(wakeup_poll.epollfd, wakeup_poll.events, nfds, 0);
for (int32_t i = 0; i < kernel_num; i++) {
- uint32_t index = g_wakeup_poll.events[i].data.u32;
- fds[index].revents = g_wakeup_poll.events[i].events;
+ uint32_t index = wakeup_poll.events[i].data.u32;
+ fds[index].revents = wakeup_poll.events[i].events;
}
event_num += kernel_num >= 0 ? kernel_num : 0;
}
if (event_num > 0) {
+ while (sem_trywait(&wakeup_poll.event_sem) == 0);
break;
}
+ wakeup_poll.have_kernel_event = false;
if (timeout < 0) {
- ret = sem_wait(&g_wakeup_poll.event_sem);
+ ret = sem_wait(&wakeup_poll.event_sem);
} else {
- ret = sem_timedwait(&g_wakeup_poll.event_sem, &poll_time);
+ ret = sem_timedwait(&wakeup_poll.event_sem, &poll_time);
}
} while (ret == 0);
diff --git a/src/lstack/api/lstack_wrap.c b/src/lstack/api/lstack_wrap.c
index bf5dcb4..ec68d62 100644
--- a/src/lstack/api/lstack_wrap.c
+++ b/src/lstack/api/lstack_wrap.c
@@ -52,7 +52,7 @@ static inline enum KERNEL_LWIP_PATH select_path(int fd)
return PATH_KERNEL;
}
- if (unlikely(posix_api->is_chld)) {
+ if (unlikely(posix_api->ues_posix)) {
return PATH_KERNEL;
}
@@ -84,7 +84,7 @@ static inline int32_t do_epoll_create(int32_t size)
return posix_api->epoll_create_fn(size);
}
- if (unlikely(posix_api->is_chld)) {
+ if (unlikely(posix_api->ues_posix)) {
return posix_api->epoll_create_fn(size);
}
@@ -93,7 +93,7 @@ static inline int32_t do_epoll_create(int32_t size)
static inline int32_t do_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_event* event)
{
- if (unlikely(posix_api->is_chld)) {
+ if (unlikely(posix_api->ues_posix)) {
return posix_api->epoll_ctl_fn(epfd, op, fd, event);
}
@@ -102,7 +102,7 @@ static inline int32_t do_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct
static inline int32_t do_epoll_wait(int32_t epfd, struct epoll_event* events, int32_t maxevents, int32_t timeout)
{
- if (unlikely(posix_api->is_chld)) {
+ if (unlikely(posix_api->ues_posix)) {
return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout);
}
@@ -203,7 +203,8 @@ static inline int32_t do_listen(int32_t s, int32_t backlog)
return posix_api->listen_fn(s, backlog);
}
- int32_t ret = stack_broadcast_listen(s, backlog);
+ int32_t ret = get_global_cfg_params()->listen_shadow ? stack_broadcast_listen(s, backlog) :
+ stack_single_listen(s, backlog);
if (ret != 0) {
return ret;
}
@@ -264,7 +265,7 @@ static inline int32_t do_setsockopt(int32_t s, int32_t level, int32_t optname, c
static inline int32_t do_socket(int32_t domain, int32_t type, int32_t protocol)
{
if ((domain != AF_INET && domain != AF_UNSPEC)
- || posix_api->is_chld) {
+ || posix_api->ues_posix) {
return posix_api->socket_fn(domain, type, protocol);
}
@@ -368,7 +369,7 @@ static int32_t do_poll(struct pollfd *fds, nfds_t nfds, int32_t timeout)
GAZELLE_RETURN(EINVAL);
}
- if (unlikely(posix_api->is_chld) || nfds == 0) {
+ if (unlikely(posix_api->ues_posix) || nfds == 0) {
return posix_api->poll_fn(fds, nfds, timeout);
}
diff --git a/src/lstack/core/lstack_cfg.c b/src/lstack/core/lstack_cfg.c
index 13086a3..ca2b979 100644
--- a/src/lstack/core/lstack_cfg.c
+++ b/src/lstack/core/lstack_cfg.c
@@ -56,6 +56,7 @@ static int32_t parse_devices(void);
static int32_t parse_dpdk_args(void);
static int32_t parse_gateway_addr(void);
static int32_t parse_kni_switch(void);
+static int32_t parse_listen_shadow(void);
struct config_vector_t {
const char *name;
@@ -73,6 +74,7 @@ static struct config_vector_t g_config_tbl[] = {
{ "num_wakeup", parse_wakeup_cpu_number },
{ "low_power_mode", parse_low_power_mode },
{ "kni_switch", parse_kni_switch },
+ { "listen_shadow", parse_listen_shadow },
{ NULL, NULL }
};
@@ -670,6 +672,22 @@ static int32_t parse_use_ltran(void)
return 0;
}
+static int32_t parse_listen_shadow(void)
+{
+ const config_setting_t *arg = NULL;
+
+ arg = config_lookup(&g_config, "listen_shadow");
+ if (arg == NULL) {
+ g_config_params.listen_shadow = false;
+ return 0;
+ }
+
+ int32_t val = config_setting_get_int(arg);
+ g_config_params.listen_shadow = (val == 0) ? false : true;
+
+ return 0;
+}
+
static int32_t parse_kni_switch(void)
{
const config_setting_t *arg = NULL;
diff --git a/src/lstack/core/lstack_control_plane.c b/src/lstack/core/lstack_control_plane.c
index 26a1b1c..ef38fb5 100644
--- a/src/lstack/core/lstack_control_plane.c
+++ b/src/lstack/core/lstack_control_plane.c
@@ -713,7 +713,7 @@ void control_server_thread(void *arg)
struct epoll_event evt_array;
while (1) {
/* wait init finish */
- if (posix_api->is_chld) {
+ if (posix_api->ues_posix) {
usleep(GAZELLE_10MS);
continue;
}
@@ -759,7 +759,7 @@ void control_client_thread(void *arg)
while (1) {
/* wait init finish */
- if (posix_api->is_chld) {
+ if (posix_api->ues_posix) {
usleep(GAZELLE_10MS);
continue;
}
diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c
index df0332b..6675d7b 100644
--- a/src/lstack/core/lstack_dpdk.c
+++ b/src/lstack/core/lstack_dpdk.c
@@ -129,26 +129,6 @@ static struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_
return pool;
}
-struct rte_mempool *create_rpc_mempool(const char *name, uint16_t queue_id)
-{
- char pool_name[PATH_MAX];
- struct rte_mempool *pool;
- int32_t ret;
-
- ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%hu", name, queue_id);
- if (ret < 0) {
- return NULL;
- }
-
- pool = rte_mempool_create(pool_name, CALL_POOL_SZ, sizeof(struct rpc_msg), 0, 0, NULL, NULL, NULL,
- NULL, rte_socket_id(), 0);
- if (pool == NULL) {
- LSTACK_LOG(ERR, LSTACK, "cannot create %s pool rte_err=%d\n", pool_name, rte_errno);
- }
-
- return pool;
-}
-
static struct reg_ring_msg *create_reg_mempool(const char *name, uint16_t queue_id)
{
int ret;
@@ -175,13 +155,13 @@ int32_t pktmbuf_pool_init(struct protocol_stack *stack, uint16_t stack_num)
return -1;
}
- stack->rx_pktmbuf_pool = create_pktmbuf_mempool("rx_mbuf", RX_NB_MBUF, RX_MBUF_CACHE_SZ,
+ stack->rx_pktmbuf_pool = create_pktmbuf_mempool("rx_mbuf", RX_NB_MBUF / stack_num, RX_MBUF_CACHE_SZ,
stack->queue_id);
if (stack->rx_pktmbuf_pool == NULL) {
return -1;
}
- stack->tx_pktmbuf_pool = create_pktmbuf_mempool("tx_mbuf", TX_NB_MBUF, TX_MBUF_CACHE_SZ,
+ stack->tx_pktmbuf_pool = create_pktmbuf_mempool("tx_mbuf", TX_NB_MBUF / stack_num, TX_MBUF_CACHE_SZ,
stack->queue_id);
if (stack->tx_pktmbuf_pool == NULL) {
return -1;
@@ -220,12 +200,14 @@ int32_t create_shared_ring(struct protocol_stack *stack)
lockless_queue_init(&stack->rpc_queue);
if (get_protocol_stack_group()->wakeup_enable) {
- stack->wakeup_ring = create_ring("WAKEUP_RING", VDEV_WAKEUP_QUEUE_SZ, 0, stack->queue_id);
+ stack->wakeup_ring = create_ring("WAKEUP_RING", VDEV_WAKEUP_QUEUE_SZ, RING_F_SP_ENQ | RING_F_SC_DEQ,
+ stack->queue_id);
if (stack->wakeup_ring == NULL) {
return -1;
}
}
+
if (use_ltran()) {
stack->rx_ring = create_ring("RING_RX", VDEV_RX_QUEUE_SZ, RING_F_SP_ENQ | RING_F_SC_DEQ, stack->queue_id);
if (stack->rx_ring == NULL) {
@@ -255,7 +237,7 @@ int32_t fill_mbuf_to_ring(struct rte_mempool *mempool, struct rte_ring *ring, ui
struct rte_mbuf *free_buf[FREE_RX_QUEUE_SZ];
while (remain > 0) {
- batch = LWIP_MIN(remain, FREE_RX_QUEUE_SZ);
+ batch = LWIP_MIN(remain, RING_SIZE(FREE_RX_QUEUE_SZ));
ret = gazelle_alloc_pktmbuf(mempool, free_buf, batch);
if (ret != 0) {
@@ -263,7 +245,7 @@ int32_t fill_mbuf_to_ring(struct rte_mempool *mempool, struct rte_ring *ring, ui
return -1;
}
- ret = rte_ring_en_enqueue_bulk(ring, (void **)free_buf, batch);
+ ret = gazelle_ring_sp_enqueue(ring, (void **)free_buf, batch);
if (ret == 0) {
LSTACK_LOG(ERR, LSTACK, "cannot enqueue to ring, count: %d\n", (int32_t)batch);
return -1;
diff --git a/src/lstack/core/lstack_init.c b/src/lstack/core/lstack_init.c
index f8e96bf..78040b0 100644
--- a/src/lstack/core/lstack_init.c
+++ b/src/lstack/core/lstack_init.c
@@ -143,7 +143,7 @@ static int32_t check_preload_bind_proc(void)
__attribute__((destructor)) void gazelle_network_exit(void)
{
- if (posix_api != NULL && !posix_api->is_chld) {
+ if (posix_api != NULL && !posix_api->ues_posix) {
lwip_exit();
}
@@ -275,7 +275,6 @@ __attribute__((constructor)) void gazelle_network_init(void)
LSTACK_EXIT(1, "stack thread or kernel_event thread failed\n");
}
- posix_api->is_chld = 0;
+ posix_api->ues_posix = 0;
LSTACK_LOG(INFO, LSTACK, "gazelle_network_init success\n");
- rte_smp_mb();
}
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index 8544ef7..156fc1f 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -35,33 +35,24 @@
#define HALF_DIVISOR (2)
#define USED_IDLE_WATERMARK (VDEV_IDLE_QUEUE_SZ >> 2)
-void listen_list_add_node(int32_t head_fd, int32_t add_fd)
-{
- struct lwip_sock *sock = NULL;
- int32_t fd = head_fd;
-
- while (fd > 0) {
- sock = get_socket(fd);
- if (sock == NULL) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null\n", get_stack_tid(), fd);
- return;
- }
- fd = sock->nextfd;
- }
- sock->nextfd = add_fd;
-}
+static int32_t lwip_alloc_pbufs(pbuf_layer layer, uint16_t length, pbuf_type type, void **pbufs, uint32_t num);
static void free_ring_pbuf(struct rte_ring *ring)
{
- while (1) {
- struct pbuf *pbuf = NULL;
- int32_t ret = rte_ring_sc_dequeue(ring, (void **)&pbuf);
- if (ret != 0) {
- break;
- }
+ void *pbufs[SOCK_RECV_RING_SIZE];
- pbuf_free(pbuf);
- }
+ do {
+ gazelle_ring_read(ring, pbufs, RING_SIZE(SOCK_RECV_RING_SIZE));
+ gazelle_ring_read_over(ring);
+ } while (gazelle_ring_readable_count(ring));
+
+ do {
+ uint32_t num = gazelle_ring_sc_dequeue(ring, pbufs, RING_SIZE(SOCK_RECV_RING_SIZE));
+
+ for (uint32_t i = 0; i < num; i++) {
+ pbuf_free(pbufs[i]);
+ }
+ } while (gazelle_ring_readover_count(ring));
}
static void reset_sock_data(struct lwip_sock *sock)
@@ -73,11 +64,6 @@ static void reset_sock_data(struct lwip_sock *sock)
}
sock->recv_ring = NULL;
- if (sock->recv_wait_free) {
- free_ring_pbuf(sock->recv_wait_free);
- rte_ring_free(sock->recv_wait_free);
- }
- sock->recv_wait_free = NULL;
if (sock->send_ring) {
free_ring_pbuf(sock->send_ring);
@@ -85,19 +71,11 @@ static void reset_sock_data(struct lwip_sock *sock)
}
sock->send_ring = NULL;
- if (sock->send_idle_ring) {
- free_ring_pbuf(sock->send_idle_ring);
- rte_ring_free(sock->send_idle_ring);
- }
- sock->send_idle_ring = NULL;
sock->stack = NULL;
sock->wakeup = NULL;
- sock->events = 0;
- sock->nextfd = -1;
- sock->attach_fd = -1;
+ sock->listen_next = NULL;
sock->wait_close = false;
- sock->shadowed_sock = NULL;
sock->epoll_events = 0;
sock->events = 0;
@@ -105,34 +83,29 @@ static void reset_sock_data(struct lwip_sock *sock)
pbuf_free(sock->recv_lastdata);
}
sock->recv_lastdata = NULL;
-
- if (sock->send_lastdata) {
- pbuf_free(sock->send_lastdata);
- }
- sock->send_lastdata = NULL;
}
static void replenish_send_idlembuf(struct rte_ring *ring)
{
- uint32_t replenish_cnt = rte_ring_free_count(ring);
+ void *pbuf[SOCK_SEND_RING_SIZE];
- for (uint32_t i = 0; i < replenish_cnt; i++) {
- struct pbuf *pbuf = lwip_alloc_pbuf(PBUF_TRANSPORT, TCP_MSS, PBUF_RAM);
- if (pbuf == NULL) {
- break;
- }
+ uint32_t replenish_cnt = gazelle_ring_free_count(ring);
+ uint32_t alloc_num = LWIP_MIN(replenish_cnt, RING_SIZE(SOCK_SEND_RING_SIZE));
- int32_t ret = rte_ring_sp_enqueue(ring, (void *)pbuf);
- if (ret < 0) {
- pbuf_free(pbuf);
- break;
- }
+ if (lwip_alloc_pbufs(PBUF_TRANSPORT, TCP_MSS, PBUF_RAM, (void **)pbuf, alloc_num) != 0) {
+ return;
+ }
+
+ uint32_t num = gazelle_ring_sp_enqueue(ring, pbuf, alloc_num);
+ for (uint32_t i = num; i < alloc_num; i++) {
+ pbuf_free(pbuf[i]);
}
}
void gazelle_init_sock(int32_t fd)
{
static uint32_t name_tick = 0;
+ struct protocol_stack *stack = get_protocol_stack();
struct lwip_sock *sock = get_socket(fd);
if (sock == NULL) {
return;
@@ -140,38 +113,26 @@ void gazelle_init_sock(int32_t fd)
reset_sock_data(sock);
- sock->recv_ring = create_ring("sock_recv", SOCK_RECV_RING_SIZE, 0, atomic_fetch_add(&name_tick, 1));
+ sock->recv_ring = create_ring("sock_recv", SOCK_RECV_RING_SIZE, RING_F_SP_ENQ | RING_F_SC_DEQ,
+ atomic_fetch_add(&name_tick, 1));
if (sock->recv_ring == NULL) {
LSTACK_LOG(ERR, LSTACK, "sock_recv create failed. errno: %d.\n", rte_errno);
return;
}
- sock->recv_wait_free = create_ring("wait_free", SOCK_RECV_RING_SIZE, 0, atomic_fetch_add(&name_tick, 1));
- if (sock->recv_wait_free == NULL) {
- LSTACK_LOG(ERR, LSTACK, "wait_free create failed. errno: %d.\n", rte_errno);
- return;
- }
-
- sock->send_ring = create_ring("sock_send", SOCK_SEND_RING_SIZE, 0, atomic_fetch_add(&name_tick, 1));
+ sock->send_ring = create_ring("sock_send", SOCK_SEND_RING_SIZE, RING_F_SP_ENQ | RING_F_SC_DEQ,
+ atomic_fetch_add(&name_tick, 1));
if (sock->send_ring == NULL) {
LSTACK_LOG(ERR, LSTACK, "sock_send create failed. errno: %d.\n", rte_errno);
return;
}
+ replenish_send_idlembuf(sock->send_ring);
- sock->send_idle_ring = create_ring("idle_send", SOCK_SEND_RING_SIZE, 0, atomic_fetch_add(&name_tick, 1));
- if (sock->send_idle_ring == NULL) {
- LSTACK_LOG(ERR, LSTACK, "idle_send create failed. errno: %d.\n", rte_errno);
- return;
- }
- replenish_send_idlembuf(sock->send_idle_ring);
-
- sock->stack = get_protocol_stack();
+ sock->stack = stack;
sock->stack->conn_num++;
- init_list_node(&sock->recv_list);
- init_list_node(&sock->attach_list);
- init_list_node(&sock->listen_list);
- init_list_node(&sock->event_list);
- init_list_node(&sock->send_list);
+ init_list_node_null(&sock->recv_list);
+ init_list_node_null(&sock->event_list);
+ init_list_node_null(&sock->send_list);
}
void gazelle_clean_sock(int32_t fd)
@@ -181,17 +142,18 @@ void gazelle_clean_sock(int32_t fd)
return;
}
+ if (sock->wakeup && sock->wakeup->type == WAKEUP_EPOLL) {
+ pthread_spin_lock(&sock->wakeup->event_list_lock);
+ list_del_node_null(&sock->event_list);
+ pthread_spin_unlock(&sock->wakeup->event_list_lock);
+ }
+
sock->stack->conn_num--;
reset_sock_data(sock);
- list_del_node_init(&sock->recv_list);
- list_del_node_init(&sock->attach_list);
- list_del_node_init(&sock->listen_list);
-#ifdef GAZELLE_USE_EPOLL_EVENT_STACK
- list_del_node_init(&sock->event_list);
-#endif
- list_del_node_init(&sock->send_list);
+ list_del_node_null(&sock->recv_list);
+ list_del_node_null(&sock->send_list);
}
void gazelle_free_pbuf(struct pbuf *pbuf)
@@ -201,45 +163,14 @@ void gazelle_free_pbuf(struct pbuf *pbuf)
}
struct rte_mbuf *mbuf = pbuf_to_mbuf(pbuf);
- if (mbuf->pool != NULL) {
- rte_pktmbuf_free(mbuf);
- } else {
- rte_free(mbuf);
- }
-}
-
-static int32_t alloc_mbufs(struct rte_mempool *pool, struct rte_mbuf **mbufs, uint32_t num)
-{
- // alloc mbuf from pool
- if (rte_pktmbuf_alloc_bulk(pool, mbufs, num) == 0) {
- return 0;
- }
-
- // alloc mbuf from system
- for (uint32_t i = 0; i < num; i++) {
- struct rte_mbuf *mbuf = (struct rte_mbuf *)rte_malloc(NULL, pool->elt_size, sizeof(uint64_t));
- if (mbuf == NULL) {
- for (uint32_t j = 0; j < i; j++) {
- rte_free(mbufs[j]);
- mbufs[j] = NULL;
- }
- return -1;
- }
-
- mbufs[i] = mbuf;
- rte_pktmbuf_init(pool, NULL, mbuf, 0);
- rte_pktmbuf_reset(mbuf);
- mbuf->pool = NULL;
- }
-
- return 0;
+ rte_pktmbuf_free(mbuf);
}
int32_t gazelle_alloc_pktmbuf(struct rte_mempool *pool, struct rte_mbuf **mbufs, uint32_t num)
{
struct pbuf_custom *pbuf_custom = NULL;
- int32_t ret = alloc_mbufs(pool, mbufs, num);
+ int32_t ret = rte_pktmbuf_alloc_bulk(pool, mbufs, num);
if (ret != 0) {
return ret;
}
@@ -252,86 +183,98 @@ int32_t gazelle_alloc_pktmbuf(struct rte_mempool *pool, struct rte_mbuf **mbufs,
return 0;
}
-struct pbuf *lwip_alloc_pbuf(pbuf_layer layer, uint16_t length, pbuf_type type)
+static struct pbuf *init_mbuf_to_pbuf(struct rte_mbuf *mbuf, pbuf_layer layer, uint16_t length, pbuf_type type)
{
- struct rte_mbuf *mbuf;
- int32_t ret = alloc_mbufs(get_protocol_stack()->tx_pktmbuf_pool, &mbuf, 1);
- if (ret != 0) {
- get_protocol_stack()->stats.tx_allocmbuf_fail++;
- return NULL;
- }
-
struct pbuf_custom *pbuf_custom = mbuf_to_pbuf(mbuf);
pbuf_custom->custom_free_function = gazelle_free_pbuf;
void *data = rte_pktmbuf_mtod(mbuf, void *);
struct pbuf *pbuf = pbuf_alloced_custom(layer, length, type, pbuf_custom, data, MAX_PACKET_SZ);
-
-#if CHECKSUM_CHECK_IP_HW || CHECKSUM_CHECK_TCP_HW
if (pbuf) {
pbuf->ol_flags = 0;
pbuf->l2_len = 0;
pbuf->l3_len = 0;
}
-#endif
return pbuf;
}
-struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size, uint8_t *apiflags)
+static int32_t lwip_alloc_pbufs(pbuf_layer layer, uint16_t length, pbuf_type type, void **bufs, uint32_t num)
{
- struct pbuf *pbuf = NULL;
+ int32_t ret = rte_pktmbuf_alloc_bulk(get_protocol_stack()->tx_pktmbuf_pool, (struct rte_mbuf **)bufs, num);
+ if (ret != 0) {
+ get_protocol_stack()->stats.tx_allocmbuf_fail++;
+ return -1;
+ }
- if (sock->send_lastdata) {
- pbuf = sock->send_lastdata;
- sock->send_lastdata = NULL;
- } else {
- int32_t ret = rte_ring_sc_dequeue(sock->send_ring, (void **)&pbuf);
- if (ret != 0) {
- *apiflags &= ~TCP_WRITE_FLAG_MORE;
- return NULL;
- }
+ for (uint32_t i = 0; i < num; i++) {
+ bufs[i] = init_mbuf_to_pbuf(bufs[i], layer, length, type);
}
- if (pbuf->tot_len >= remain_size) {
- sock->send_lastdata = pbuf;
- *apiflags |= TCP_WRITE_FLAG_MORE; /* set TCP_PSH flag */
+ return 0;
+}
+
+struct pbuf *lwip_alloc_pbuf(pbuf_layer layer, uint16_t length, pbuf_type type)
+{
+ struct pbuf *pbuf;
+
+ if (lwip_alloc_pbufs(layer, length, type, (void **)&pbuf, 1) != 0) {
return NULL;
}
- replenish_send_idlembuf(sock->send_idle_ring);
+ return pbuf;
+}
- if ((sock->epoll_events & EPOLLOUT) && rte_ring_free_count(sock->send_ring)) {
- add_epoll_event(sock->conn, EPOLLOUT);
+struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size, uint8_t *apiflags)
+{
+ struct pbuf *pbuf = NULL;
+
+ if (gazelle_ring_sc_dequeue(sock->send_ring, (void **)&pbuf, 1) != 1) {
+ *apiflags &= ~TCP_WRITE_FLAG_MORE;
+ return NULL;
}
sock->stack->stats.write_lwip_cnt++;
return pbuf;
}
+static inline void del_data_out_event(struct lwip_sock *sock)
+{
+ pthread_spin_lock(&sock->wakeup->event_list_lock);
+
+ /* check again avoid cover event add in stack thread */
+ if (!NETCONN_IS_OUTIDLE(sock)) {
+ sock->events &= ~EPOLLOUT;
+
+ if (sock->events == 0) {
+ list_del_node_null(&sock->event_list);
+ }
+ }
+
+ pthread_spin_unlock(&sock->wakeup->event_list_lock);
+}
+
ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len)
{
- if (sock->events & EPOLLERR) {
+ if (sock->errevent > 0) {
return 0;
}
- uint32_t free_count = rte_ring_free_count(sock->send_ring);
+ uint32_t free_count = gazelle_ring_readable_count(sock->send_ring);
if (free_count == 0) {
return -1;
}
- uint32_t avaible_cont = rte_ring_count(sock->send_idle_ring);
- avaible_cont = LWIP_MIN(free_count, avaible_cont);
-
struct pbuf *pbuf = NULL;
ssize_t send_len = 0;
size_t copy_len;
uint32_t send_pkt = 0;
- while (send_len < len && send_pkt < avaible_cont) {
- int32_t ret = rte_ring_sc_dequeue(sock->send_idle_ring, (void **)&pbuf);
- if (ret < 0) {
- sock->stack->stats.app_write_idlefail++;
+ while (send_len < len && send_pkt < free_count) {
+ if (gazelle_ring_read(sock->send_ring, (void **)&pbuf, 1) != 1) {
+ if (sock->wakeup) {
+ sock->wakeup->stat.app_write_idlefail++;
+ }
break;
}
@@ -339,21 +282,42 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len)
pbuf_take(pbuf, (char *)buf + send_len, copy_len);
pbuf->tot_len = pbuf->len = copy_len;
- ret = rte_ring_sp_enqueue(sock->send_ring, pbuf);
- if (ret != 0) {
- sock->stack->stats.app_write_drop++;
- pbuf_free(pbuf);
- break;
- }
-
send_len += copy_len;
send_pkt++;
}
- __sync_fetch_and_add(&sock->stack->stats.app_write_cnt, send_pkt);
+ gazelle_ring_read_over(sock->send_ring);
+
+ if (sock->wakeup) {
+ sock->wakeup->stat.app_write_cnt += send_pkt;
+ if (sock->wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLOUT)) {
+ del_data_out_event(sock);
+ }
+ }
return (send_len <= 0) ? -1 : send_len;
}
+static void do_lwip_send(int32_t fd, struct lwip_sock *sock, int32_t flags)
+{
+ /* send all send_ring, so len set lwip send max. */
+ ssize_t len = lwip_send(fd, sock, UINT16_MAX, flags);
+ if (len == 0) {
+ /* FIXME: should use POLLRDHUP, when connection be closed. lwip event-callback no POLLRDHUP */
+ sock->errevent = 1;
+ add_epoll_event(sock->conn, EPOLLERR);
+ }
+
+ if (gazelle_ring_readable_count(sock->send_ring) < SOCK_SEND_REPLENISH_THRES) {
+ replenish_send_idlembuf(sock->send_ring);
+ }
+
+ if (len > 0) {
+ if ((sock->epoll_events & EPOLLOUT) && NETCONN_IS_OUTIDLE(sock)) {
+ add_epoll_event(sock->conn, EPOLLOUT);
+ }
+ }
+}
+
void stack_send(struct rpc_msg *msg)
{
int32_t fd = msg->args[MSG_ARG_0].i;
@@ -369,17 +333,11 @@ void stack_send(struct rpc_msg *msg)
return;
}
- /* send all send_ring, so len set lwip send max. */
- ssize_t len = lwip_send(fd, sock, UINT16_MAX, flags);
- if (len == 0) {
- /* FIXME: should use POLLRDHUP, when connection be closed. lwip event-callback no POLLRDHUP */
- add_epoll_event(sock->conn, EPOLLERR);
- }
+ do_lwip_send(fd, sock, flags);
/* have remain data add sendlist */
if (NETCONN_IS_DATAOUT(sock)) {
- if (list_is_empty(&sock->send_list)) {
- sock->send_flags = flags;
+ if (list_is_null(&sock->send_list)) {
list_add_node(&sock->stack->send_list, &sock->send_list);
}
sock->stack->stats.send_self_rpc++;
@@ -396,20 +354,14 @@ void send_stack_list(struct protocol_stack *stack, uint32_t send_max)
sock = container_of(node, struct lwip_sock, send_list);
if (sock->conn == NULL || !NETCONN_IS_DATAOUT(sock)) {
- list_del_node_init(&sock->send_list);
+ list_del_node_null(&sock->send_list);
continue;
}
- /* send all send_ring, so len set lwip send max. */
- ssize_t len = lwip_send(sock->conn->socket, sock, UINT16_MAX, sock->send_flags);
- if (len == 0) {
- /* FIXME: should use POLLRDHUP, when connection be closed. lwip event-callback no POLLRDHUP */
- add_epoll_event(sock->conn, EPOLLERR);
- list_del_node_init(&sock->send_list);
- }
+ do_lwip_send(sock->conn->socket, sock, 0);
if (!NETCONN_IS_DATAOUT(sock)) {
- list_del_node_init(&sock->send_list);
+ list_del_node_null(&sock->send_list);
}
if (++read_num >= send_max) {
@@ -418,26 +370,39 @@ void send_stack_list(struct protocol_stack *stack, uint32_t send_max)
}
}
+static inline void free_recv_ring_readover(struct rte_ring *ring)
+{
+ void *pbufs[SOCK_RECV_RING_SIZE];
+ uint32_t num = gazelle_ring_sc_dequeue(ring, pbufs, RING_SIZE(SOCK_RECV_RING_SIZE));
+ for (uint32_t i = 0; i < num; i++) {
+ pbuf_free(pbufs[i]);
+ }
+}
+
ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags)
{
if (sock->conn->recvmbox == NULL) {
return 0;
}
- if (rte_ring_count(sock->recv_wait_free)) {
- free_ring_pbuf(sock->recv_wait_free);
+ if (gazelle_ring_readover_count(sock->recv_ring) >= SOCK_RECV_FREE_THRES) {
+ free_recv_ring_readover(sock->recv_ring);
+ }
+
+ uint32_t free_count = gazelle_ring_free_count(sock->recv_ring);
+ if (free_count == 0) {
+ GAZELLE_RETURN(EAGAIN);
}
- uint32_t free_count = rte_ring_free_count(sock->recv_ring);
uint32_t data_count = rte_ring_count(sock->conn->recvmbox->ring);
- uint32_t read_max = LWIP_MIN(free_count, data_count);
- struct pbuf *pbuf = NULL;
+ uint32_t read_num = LWIP_MIN(free_count, data_count);
+ read_num = LWIP_MIN(read_num, SOCK_RECV_RING_SIZE);
+ struct pbuf *pbufs[SOCK_RECV_RING_SIZE];
uint32_t read_count = 0;
ssize_t recv_len = 0;
- int32_t ret;
- for (uint32_t i = 0; i < read_max; i++) {
- err_t err = netconn_recv_tcp_pbuf_flags(sock->conn, &pbuf, apiflags);
+ for (uint32_t i = 0; i < read_num; i++) {
+ err_t err = netconn_recv_tcp_pbuf_flags(sock->conn, &pbufs[i], apiflags);
if (err != ERR_OK) {
if (recv_len > 0) {
/* already received data, return that (this trusts in getting the same error from
@@ -448,35 +413,28 @@ ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags)
return (err == ERR_CLSD) ? 0 : -1;
}
- if (!(flags & MSG_PEEK)) {
- ret = rte_ring_sp_enqueue(sock->recv_ring, pbuf);
- if (ret != 0) {
- pbuf_free(pbuf);
- sock->stack->stats.read_lwip_drop++;
- break;
- }
- read_count++;
- }
-
- if (get_protocol_stack_group()->latency_start) {
- calculate_lstack_latency(&sock->stack->latency, pbuf, GAZELLE_LATENCY_LWIP);
- }
-
- recv_len += pbuf->len;
+ recv_len += pbufs[i]->tot_len;
+ read_count++;
/* once we have some data to return, only add more if we don't need to wait */
apiflags |= NETCONN_DONTBLOCK | NETCONN_NOFIN;
}
- if (data_count > read_count) {
- add_recv_list(sock->conn->socket);
+ if (!(flags & MSG_PEEK)) {
+ uint32_t enqueue_num = gazelle_ring_sp_enqueue(sock->recv_ring, (void **)pbufs, read_count);
+ for (uint32_t i = enqueue_num; i < read_count; i++) {
+ /* update receive window */
+ tcp_recved(sock->conn->pcb.tcp, pbufs[i]->tot_len);
+ pbuf_free(pbufs[i]);
+ sock->stack->stats.read_lwip_drop++;
+ }
}
- if (recv_len > 0 && (flags & MSG_PEEK) == 0) {
- add_epoll_event(sock->conn, EPOLLIN);
+ for (uint32_t i = 0; get_protocol_stack_group()->latency_start && i < read_count; i++) {
+ calculate_lstack_latency(&sock->stack->latency, pbufs[i], GAZELLE_LATENCY_LWIP);
}
- sock->stack->stats.read_lwip_cnt += read_count;
+ sock->stack->stats.read_lwip_cnt += read_count;
if (recv_len == 0) {
GAZELLE_RETURN(EAGAIN);
}
@@ -532,14 +490,12 @@ ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags)
GAZELLE_RETURN(EINVAL);
}
- sock->send_flags = flags;
ssize_t send = write_stack_data(sock, buf, len);
if (send < 0) {
GAZELLE_RETURN(EAGAIN);
} else if (send == 0) {
return 0;
}
- rte_smp_mb();
rpc_call_send(fd, NULL, send, flags);
return send;
@@ -574,22 +530,52 @@ ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags)
return buflen;
}
+static inline void del_data_in_event(struct lwip_sock *sock)
+{
+ pthread_spin_lock(&sock->wakeup->event_list_lock);
+
+ /* check again avoid cover event add in stack thread */
+ if (!NETCONN_IS_DATAIN(sock)) {
+ sock->events &= ~EPOLLIN;
+
+ if (sock->events == 0) {
+ list_del_node_null(&sock->event_list);
+ }
+ }
+
+ pthread_spin_unlock(&sock->wakeup->event_list_lock);
+}
+
+static struct pbuf *pbuf_free_partial(struct pbuf *pbuf, uint16_t free_len)
+{
+ while (free_len && pbuf) {
+ if (free_len >= pbuf->len) {
+ struct pbuf *p = pbuf;
+ pbuf = pbuf->next;
+ free_len = free_len - p->len;
+ } else {
+ pbuf_remove_header(pbuf, free_len);
+ break;
+ }
+ }
+
+ return pbuf;
+}
+
ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags)
{
size_t recv_left = len;
struct pbuf *pbuf = NULL;
ssize_t recvd = 0;
- int32_t ret;
- u16_t copy_len;
+ uint16_t copy_len;
struct lwip_sock *sock = get_socket(fd);
if (sock == NULL) {
LSTACK_LOG(ERR, LSTACK, "get_socket null fd %d.\n", fd);
GAZELLE_RETURN(EINVAL);
}
- sock->recv_flags = flags;
- if ((sock->events & EPOLLERR) && !NETCONN_IS_DATAIN(sock)) {
+ if (sock->errevent > 0) {
return 0;
}
@@ -598,35 +584,39 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags)
pbuf = sock->recv_lastdata;
sock->recv_lastdata = NULL;
} else {
- ret = rte_ring_sc_dequeue(sock->recv_ring, (void **)&pbuf);
- if (ret != 0) {
+ if (gazelle_ring_read(sock->recv_ring, (void **)&pbuf, 1) != 1) {
break;
}
}
- copy_len = (recv_left > pbuf->tot_len) ? pbuf->tot_len : (u16_t)recv_left;
+ copy_len = (recv_left > pbuf->len) ? pbuf->len : (uint16_t)recv_left;
pbuf_copy_partial(pbuf, (char *)buf + recvd, copy_len, 0);
recvd += copy_len;
recv_left -= copy_len;
- if (pbuf->tot_len > copy_len) {
- sock->recv_lastdata = pbuf_free_header(pbuf, copy_len);
+ if (pbuf->len > copy_len || pbuf->next) {
+ sock->recv_lastdata = pbuf_free_partial(pbuf, copy_len);
} else {
+ if (sock->wakeup) {
+ sock->wakeup->stat.app_read_cnt += 1;
+ }
if (get_protocol_stack_group()->latency_start) {
calculate_lstack_latency(&sock->stack->latency, pbuf, GAZELLE_LATENCY_READ);
}
- ret = rte_ring_sp_enqueue(sock->recv_wait_free, pbuf);
- if (ret != 0) {
- pbuf_free(pbuf);
- }
- sock->recv_lastdata = NULL;
- __sync_fetch_and_add(&sock->stack->stats.app_read_cnt, 1);
+ gazelle_ring_read_over(sock->recv_ring);
}
}
+ /* rte_ring_count reduce lock */
+ if (sock->wakeup && sock->wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLIN)) {
+ del_data_in_event(sock);
+ }
+
if (recvd == 0) {
- sock->stack->stats.read_null++;
+ if (sock->wakeup) {
+ sock->wakeup->stat.read_null++;
+ }
GAZELLE_RETURN(EAGAIN);
}
return recvd;
@@ -634,9 +624,9 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags)
void add_recv_list(int32_t fd)
{
- struct lwip_sock *sock = get_socket(fd);
+ struct lwip_sock *sock = get_socket_by_fd(fd);
- if (sock->stack && list_is_empty(&sock->recv_list)) {
+ if (sock && sock->stack && list_is_null(&sock->recv_list)) {
list_add_node(&sock->stack->recv_list, &sock->recv_list);
}
}
@@ -648,24 +638,26 @@ void read_recv_list(struct protocol_stack *stack, uint32_t max_num)
struct lwip_sock *sock;
uint32_t read_num = 0;
+ struct list_node *last_node = list->prev;
list_for_each_safe(node, temp, list) {
sock = container_of(node, struct lwip_sock, recv_list);
- if (sock->conn == NULL || sock->recv_ring == NULL || sock->send_ring == NULL || sock->conn->pcb.tcp == NULL) {
- list_del_node_init(&sock->recv_list);
+ if (sock->conn == NULL || sock->conn->recvmbox == NULL || rte_ring_count(sock->conn->recvmbox->ring) == 0) {
+ list_del_node_null(&sock->recv_list);
continue;
}
- if (rte_ring_free_count(sock->recv_ring)) {
- list_del_node_init(&sock->recv_list);
- ssize_t len = lwip_recv(sock->conn->socket, NULL, 0, sock->recv_flags);
- if (len == 0) {
- /* FIXME: should use POLLRDHUP, when connection be closed. lwip event-callback no POLLRDHUP */
- add_epoll_event(sock->conn, EPOLLERR);
- }
+ ssize_t len = lwip_recv(sock->conn->socket, NULL, 0, 0);
+ if (len == 0) {
+ /* FIXME: should use POLLRDHUP, when connection be closed. lwip event-callback no POLLRDHUP */
+ sock->errevent = 1;
+ add_epoll_event(sock->conn, EPOLLERR);
+ } else if (len > 0) {
+ add_epoll_event(sock->conn, EPOLLIN);
}
- if (++read_num >= max_num) {
+ /* last_node:recv only once per sock. max_num avoid cost too much time this loop */
+ if (++read_num >= max_num || last_node == node) {
break;
}
}
@@ -684,14 +676,14 @@ static void copy_pcb_to_conn(struct gazelle_stat_lstack_conn_info *conn, const s
if (netconn != NULL && netconn->recvmbox != NULL) {
conn->recv_cnt = rte_ring_count(netconn->recvmbox->ring);
+ conn->fd = netconn->socket;
struct lwip_sock *sock = get_socket(netconn->socket);
if (netconn->socket > 0 && sock != NULL && sock->recv_ring != NULL && sock->send_ring != NULL) {
- conn->recv_ring_cnt = rte_ring_count(sock->recv_ring);
+ conn->recv_ring_cnt = gazelle_ring_readable_count(sock->recv_ring);
conn->recv_ring_cnt += (sock->recv_lastdata) ? 1 : 0;
- conn->send_ring_cnt = rte_ring_count(sock->send_ring);
- conn->send_ring_cnt += (sock->send_lastdata) ? 1 : 0;
+ conn->send_ring_cnt = gazelle_ring_readover_count(sock->send_ring);
if (sock->wakeup) {
sem_getvalue(&sock->wakeup->event_sem, &conn->sem_cnt);
@@ -756,9 +748,11 @@ void create_shadow_fd(struct rpc_msg *msg)
}
clone_lwip_socket_opt(clone_sock, sock);
- clone_sock->shadowed_sock = sock;
- listen_list_add_node(fd, clone_fd);
+ while (sock->listen_next) {
+ sock = sock->listen_next;
+ }
+ sock->listen_next = clone_sock;
int32_t ret = lwip_bind(clone_fd, addr, addr_len);
if (ret < 0) {
@@ -843,11 +837,6 @@ static uint32_t get_list_count(struct list_node *list)
return count;
}
-void stack_eventlist_count(struct rpc_msg *msg)
-{
- msg->result = get_list_count(&get_protocol_stack()->event_list);
-}
-
void stack_sendlist_count(struct rpc_msg *msg)
{
msg->result = get_list_count(&get_protocol_stack()->send_list);
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index 88513ba..a1f3790 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -11,6 +11,7 @@
*/
#define _GNU_SOURCE
#include <pthread.h>
+#include <stdatomic.h>
#include <lwip/sockets.h>
#include <lwip/tcpip.h>
@@ -30,6 +31,7 @@
#include "lstack_protocol_stack.h"
#include "lstack_cfg.h"
#include "lstack_control_plane.h"
+#include "posix/lstack_epoll.h"
#include "lstack_stack_stat.h"
#define READ_LIST_MAX 32
@@ -39,7 +41,6 @@
static PER_THREAD uint16_t g_stack_idx = PROTOCOL_STACK_MAX;
static struct protocol_stack_group g_stack_group = {0};
-static PER_THREAD long g_stack_tid = 0;
void set_init_fail(void);
typedef void *(*stack_thread_func)(void *arg);
@@ -66,6 +67,8 @@ static inline void set_stack_idx(uint16_t idx)
long get_stack_tid(void)
{
+ static PER_THREAD long g_stack_tid = 0;
+
if (g_stack_tid == 0) {
g_stack_tid = syscall(__NR_gettid);
}
@@ -96,17 +99,37 @@ struct protocol_stack *get_protocol_stack_by_fd(int32_t fd)
return sock->stack;
}
-struct protocol_stack *get_minconn_protocol_stack(void)
+struct protocol_stack *get_bind_protocol_stack(void)
{
- int32_t min_index = 0;
+ static PER_THREAD struct protocol_stack *bind_stack = NULL;
+
+ /* same app communication thread bind same stack */
+ if (bind_stack) {
+ return bind_stack;
+ }
- for (int32_t i = 1; i < g_stack_group.stack_num; i++) {
- if (g_stack_group.stacks[i]->conn_num < g_stack_group.stacks[min_index]->conn_num) {
- min_index = i;
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ uint16_t index = 0;
+
+ /* close listen shadow, per app communication thread select only one stack */
+ if (get_global_cfg_params()->listen_shadow == false) {
+ static uint16_t stack_index = 0;
+ index = atomic_fetch_add(&stack_index, 1);
+ if (index >= stack_group->stack_num) {
+ LSTACK_LOG(ERR, LSTACK, "thread =%hu larger than stack num = %hu\n", index, stack_group->stack_num);
+ return NULL;
+ }
+ /* use listen shadow, app communication thread maybe more than stack num, select the least load stack */
+ } else {
+ for (uint16_t i = 1; i < stack_group->stack_num; i++) {
+ if (stack_group->stacks[i]->conn_num < stack_group->stacks[index]->conn_num) {
+ index = i;
+ }
}
}
- return g_stack_group.stacks[min_index];
+ bind_stack = stack_group->stacks[index];
+ return stack_group->stacks[index];
}
void lstack_low_power_idling(void)
@@ -193,7 +216,7 @@ static int32_t create_thread(uint16_t queue_id, char *thread_name, stack_thread_
return 0;
}
-static void* gazelle_weakup_thread(void *arg)
+static void* gazelle_wakeup_thread(void *arg)
{
uint16_t queue_id = *(uint16_t *)arg;
struct protocol_stack *stack = get_protocol_stack_group()->stacks[queue_id];
@@ -203,17 +226,13 @@ static void* gazelle_weakup_thread(void *arg)
LSTACK_LOG(INFO, LSTACK, "weakup_%02d start\n", stack->queue_id);
+ sem_t *event_sem[WAKEUP_MAX_NUM];
+ int num;
for (;;) {
- if (rte_ring_count(stack->wakeup_ring) == 0) {
- continue;
- }
-
- sem_t *event_sem;
- if (rte_ring_sc_dequeue(stack->wakeup_ring, (void **)&event_sem)) {
- continue;
+ num = gazelle_light_ring_dequeue_burst(stack->wakeup_ring, (void **)event_sem, WAKEUP_MAX_NUM);
+ for (int i = 0; i < num; i++) {
+ sem_post(event_sem[i]);
}
-
- sem_post(event_sem);
}
return NULL;
@@ -233,12 +252,8 @@ static int32_t init_stack_value(struct protocol_stack *stack, uint16_t queue_id)
stack->lwip_stats = &lwip_stats;
init_list_node(&stack->recv_list);
- init_list_node(&stack->listen_list);
- init_list_node(&stack->event_list);
init_list_node(&stack->send_list);
- pthread_spin_init(&stack->event_lock, PTHREAD_PROCESS_SHARED);
-
sys_calibrate_tsc();
stack_stat_init();
@@ -297,8 +312,10 @@ static void* gazelle_kernel_event(void *arg)
}
for (int32_t i = 0; i < event_num; i++) {
- if (events[i].data.ptr) {
- sem_post((sem_t *)events[i].data.ptr);
+ struct wakeup_poll *wakeup = events[i].data.ptr;
+ if (wakeup) {
+ __atomic_store_n(&wakeup->have_kernel_event, true, __ATOMIC_RELEASE);
+ sem_post(&wakeup->event_sem);
}
}
}
@@ -311,7 +328,7 @@ static int32_t create_companion_thread(struct protocol_stack_group *stack_group,
int32_t ret;
if (stack_group->wakeup_enable) {
- ret = create_thread(stack->queue_id, "gazelleweakup", gazelle_weakup_thread);
+ ret = create_thread(stack->queue_id, "gazelleweakup", gazelle_wakeup_thread);
if (ret != 0) {
LSTACK_LOG(ERR, LSTACK, "gazelleweakup ret=%d errno=%d\n", ret, errno);
return ret;
@@ -339,13 +356,11 @@ static struct protocol_stack * stack_thread_init(uint16_t queue_id)
struct protocol_stack *stack = malloc(sizeof(*stack));
if (stack == NULL) {
- sem_post(&stack_group->thread_phase1);
LSTACK_LOG(ERR, LSTACK, "malloc stack failed\n");
return NULL;
}
if (init_stack_value(stack, queue_id) != 0) {
- sem_post(&stack_group->thread_phase1);
free(stack);
return NULL;
}
@@ -358,7 +373,6 @@ static struct protocol_stack * stack_thread_init(uint16_t queue_id)
if (use_ltran()) {
if (client_reg_thrd_ring() != 0) {
- sem_post(&stack_group->thread_phase1);
free(stack);
return NULL;
}
@@ -419,6 +433,8 @@ static int32_t init_protocol_sem(void)
int32_t ret;
struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ pthread_spin_init(&stack_group->wakeup_list_lock, PTHREAD_PROCESS_PRIVATE);
+
if (!use_ltran()) {
ret = sem_init(&stack_group->ethdev_init, 0, 0);
if (ret < 0) {
@@ -449,6 +465,7 @@ int32_t init_protocol_stack(void)
stack_group->stack_num = get_global_cfg_params()->num_cpu;
stack_group->wakeup_enable = (get_global_cfg_params()->num_wakeup > 0) ? true : false;
+ stack_group->wakeup_list = NULL;
if (init_protocol_sem() != 0) {
return -1;
@@ -486,58 +503,10 @@ void stack_socket(struct rpc_msg *msg)
}
}
-static inline bool is_real_close(int32_t fd)
-{
- struct lwip_sock *sock = get_socket_by_fd(fd);
-
- /* last sock */
- if (list_is_empty(&sock->attach_list)) {
- return true;
- }
-
- /* listen sock, but have attach sock */
- if (sock->attach_fd == fd) {
- sock->wait_close = true;
- return false;
- } else { /* attach sock */
- /* listen sock is normal */
- struct lwip_sock *listen_sock = get_socket_by_fd(sock->attach_fd);
- if (listen_sock == NULL || !listen_sock->wait_close) {
- list_del_node_init(&sock->attach_list);
- return true;
- }
-
- /* listen sock is wait clsoe. check this is last attach sock */
- struct list_node *list = &(sock->attach_list);
- struct list_node *node, *temp;
- uint32_t list_count = 0;
- list_for_each_safe(node, temp, list) {
- list_count++;
- }
- /* 2:listen sock is wait close and closing attach sock. close listen sock here */
- if (list_count == 2) {
- lwip_close(listen_sock->attach_fd);
- gazelle_clean_sock(listen_sock->attach_fd);
- posix_api->close_fn(listen_sock->attach_fd);
- list_del_node_init(&listen_sock->attach_list);
- }
- list_del_node_init(&sock->attach_list);
- return true;
- }
-
- list_del_node_init(&sock->attach_list);
- return true;
-}
-
void stack_close(struct rpc_msg *msg)
{
int32_t fd = msg->args[MSG_ARG_0].i;
- if (!is_real_close(fd)) {
- msg->result = 0;
- return;
- }
-
msg->result = lwip_close(fd);
if (msg->result != 0) {
LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
@@ -556,26 +525,8 @@ void stack_bind(struct rpc_msg *msg)
}
}
-static inline struct lwip_sock *reuse_listen(struct protocol_stack *stack, struct lwip_sock *listen_sock)
-{
- struct list_node *node, *temp;
- struct list_node *list = &(stack->listen_list);
- struct lwip_sock *sock;
-
- list_for_each_safe(node, temp, list) {
- sock = container_of(node, struct lwip_sock, listen_list);
- if (sock->conn->pcb.tcp->local_port == listen_sock->conn->pcb.tcp->local_port &&
- sock->conn->pcb.tcp->local_ip.addr == listen_sock->conn->pcb.tcp->local_ip.addr) {
- return sock;
- }
- }
-
- return NULL;
-}
-
void stack_listen(struct rpc_msg *msg)
{
- struct protocol_stack *stack = get_protocol_stack();
int32_t fd = msg->args[MSG_ARG_0].i;
int32_t backlog = msg->args[MSG_ARG_1].i;
@@ -585,25 +536,9 @@ void stack_listen(struct rpc_msg *msg)
return;
}
- /* stack have listen same ip+port, then attach to old listen */
- struct lwip_sock *listen_sock = reuse_listen(stack, sock);
- if (listen_sock) {
- if (list_is_empty(&sock->attach_list)) {
- list_add_node(&listen_sock->attach_list, &sock->attach_list);
- }
- sock->attach_fd = listen_sock->conn->socket;
- msg->result = 0;
- return;
- }
-
/* new listen add to stack listen list */
msg->result = lwip_listen(fd, backlog);
- if (msg->result == 0) {
- if (list_is_empty(&sock->listen_list)) {
- list_add_node(&stack->listen_list, &sock->listen_list);
- }
- sock->attach_fd = fd;
- } else {
+ if (msg->result != 0) {
LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
}
}
@@ -611,28 +546,35 @@ void stack_listen(struct rpc_msg *msg)
void stack_accept(struct rpc_msg *msg)
{
int32_t fd = msg->args[MSG_ARG_0].i;
+ msg->result = -1;
int32_t accept_fd = lwip_accept(fd, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].p);
- if (accept_fd > 0) {
- struct lwip_sock *sock = get_socket(accept_fd);
- if (sock && sock->stack) {
- msg->result = accept_fd;
- return;
- }
+ if (accept_fd < 0) {
+ LSTACK_LOG(ERR, LSTACK, "fd %d ret %d\n", fd, accept_fd);
+ return;
+ }
+ struct lwip_sock *sock = get_socket(accept_fd);
+ if (sock == NULL || sock->stack == NULL) {
lwip_close(accept_fd);
gazelle_clean_sock(accept_fd);
posix_api->close_fn(accept_fd);
+ LSTACK_LOG(ERR, LSTACK, "fd %d ret %d\n", fd, accept_fd);
+ return;
}
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d attach_fd %d failed %d\n", get_stack_tid(), msg->args[MSG_ARG_0].i,
- fd, accept_fd);
- msg->result = -1;
+ msg->result = accept_fd;
+ if (rte_ring_count(sock->conn->recvmbox->ring)) {
+ add_recv_list(accept_fd);
+ }
}
void stack_connect(struct rpc_msg *msg)
{
msg->result = lwip_connect(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].socklen);
+ if (msg->result < 0) {
+ msg->result = -errno;
+ }
}
void stack_getpeername(struct rpc_msg *msg)
@@ -723,6 +665,7 @@ void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack
ret = gazelle_alloc_pktmbuf(stack->rx_pktmbuf_pool, &mbuf_copy, 1);
if (ret != 0) {
+ stack->stats.rx_allocmbuf_fail++;
return;
}
copy_mbuf(mbuf_copy, mbuf);
@@ -737,22 +680,28 @@ void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack
/* when fd is listenfd, listenfd of all protocol stack thread will be closed */
int32_t stack_broadcast_close(int32_t fd)
{
- struct lwip_sock *sock = NULL;
- int32_t next_fd;
+ struct lwip_sock *sock = get_socket(fd);
+ int32_t ret = 0;
+
+ do {
+ sock = sock->listen_next;
+ if (rpc_call_close(fd)) {
+ ret = -1;
+ }
- while (fd > 0) {
- sock = get_socket(fd);
if (sock == NULL) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null\n", get_stack_tid(), fd);
- GAZELLE_RETURN(EINVAL);
+ break;
}
- next_fd = sock->nextfd;
+ fd = sock->conn->socket;
+ } while (sock);
- rpc_call_close(fd);
- fd = next_fd;
- }
+ return ret;
+}
- return 0;
+/* choice one stack listen */
+int32_t stack_single_listen(int32_t fd, int32_t backlog)
+{
+ return rpc_call_listen(fd, backlog);
}
/* listen sync to all protocol stack thread, so that any protocol stack thread can build connect */
@@ -797,44 +746,59 @@ int32_t stack_broadcast_listen(int32_t fd, int32_t backlog)
return 0;
}
-/* ergodic the protocol stack thread to find the connection, because all threads are listening */
-int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *addrlen)
+static struct lwip_sock *get_min_accept_sock(int32_t fd)
{
struct lwip_sock *sock = get_socket(fd);
- if (sock == NULL || sock->attach_fd < 0) {
- errno = EINVAL;
- return -1;
- }
- fd = sock->attach_fd;
-
struct lwip_sock *min_sock = NULL;
- int32_t min_fd = fd;
- while (fd > 0) {
- sock = get_socket(fd);
- if (sock == NULL) {
- GAZELLE_RETURN(EINVAL);
- }
- struct lwip_sock *attach_sock = get_socket(sock->attach_fd);
- if (attach_sock == NULL) {
- GAZELLE_RETURN(EINVAL);
- }
- if (!NETCONN_IS_ACCEPTIN(attach_sock)) {
- fd = sock->nextfd;
+ while (sock) {
+ if (!NETCONN_IS_ACCEPTIN(sock)) {
+ sock = sock->listen_next;
continue;
}
- if (min_sock == NULL || min_sock->stack->conn_num > attach_sock->stack->conn_num) {
- min_sock = attach_sock;
- min_fd = sock->attach_fd;
+ if (min_sock == NULL || min_sock->stack->conn_num > sock->stack->conn_num) {
+ min_sock = sock;
}
- fd = sock->nextfd;
+ sock = sock->listen_next;
+ }
+
+ return min_sock;
+}
+
+static void inline del_accept_in_event(struct lwip_sock *sock)
+{
+ pthread_spin_lock(&sock->wakeup->event_list_lock);
+
+ if (!NETCONN_IS_ACCEPTIN(sock)) {
+ sock->events &= ~EPOLLIN;
+ if (sock->events == 0) {
+ list_del_node_null(&sock->event_list);
+ }
}
+ pthread_spin_unlock(&sock->wakeup->event_list_lock);
+}
+
+/* ergodic the protocol stack thread to find the connection, because all threads are listening */
+int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *addrlen)
+{
int32_t ret = -1;
- if (min_sock) {
- ret = rpc_call_accept(min_fd, addr, addrlen);
+
+ struct lwip_sock *sock = get_socket(fd);
+ if (sock == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ struct lwip_sock *min_sock = get_min_accept_sock(fd);
+ if (min_sock && min_sock->conn) {
+ ret = rpc_call_accept(min_sock->conn->socket, addr, addrlen);
+ }
+
+ if (min_sock && min_sock->wakeup && min_sock->wakeup->type == WAKEUP_EPOLL) {
+ del_accept_in_event(min_sock);
}
if (ret < 0) {
diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c
index 743857f..06fac5c 100644
--- a/src/lstack/core/lstack_stack_stat.c
+++ b/src/lstack/core/lstack_stack_stat.c
@@ -25,6 +25,7 @@
#include "gazelle_dfx_msg.h"
#include "lstack_thread_rpc.h"
#include "lstack_stack_stat.h"
+#include "posix/lstack_epoll.h"
#define US_PER_SEC 1000000
@@ -87,6 +88,68 @@ static void set_latency_start_flag(bool start)
}
}
+void register_wakeup(struct wakeup_poll *wakeup)
+{
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+
+ pthread_spin_lock(&stack_group->wakeup_list_lock);
+
+ wakeup->next = stack_group->wakeup_list;
+ stack_group->wakeup_list = wakeup;
+
+ pthread_spin_unlock(&stack_group->wakeup_list_lock);
+}
+
+void unregister_wakeup(struct wakeup_poll *wakeup)
+{
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+
+ pthread_spin_lock(&stack_group->wakeup_list_lock);
+
+ struct wakeup_poll *node = stack_group->wakeup_list;
+ struct wakeup_poll *pre = NULL;
+
+ while(node && node != wakeup) {
+ pre = node;
+ node = node->next;
+ }
+
+ if (node == NULL) {
+ pthread_spin_unlock(&stack_group->wakeup_list_lock);
+ return;
+ }
+
+ if (pre) {
+ pre->next = node->next;
+ } else {
+ stack_group->wakeup_list = node->next;
+ }
+
+ pthread_spin_unlock(&stack_group->wakeup_list_lock);
+}
+
+static void get_wakeup_stat(struct protocol_stack *stack, struct gazelle_wakeup_stat *stat)
+{
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+
+ pthread_spin_lock(&stack_group->wakeup_list_lock);
+
+ struct wakeup_poll *node = stack_group->wakeup_list;
+ while (node) {
+ if (node->bind_stack == stack) {
+ stat->app_events += node->stat.app_events;
+ stat->read_null += node->stat.read_null;
+ stat->app_write_cnt += node->stat.app_write_cnt;
+ stat->app_write_idlefail += node->stat.app_write_idlefail;
+ stat->app_read_cnt += node->stat.app_read_cnt;
+ }
+
+ node = node->next;
+ }
+
+ pthread_spin_unlock(&stack_group->wakeup_list_lock);
+}
+
void lstack_get_low_power_info(struct gazelle_stat_low_power_info *low_power_info)
{
struct cfg_params *cfg = get_global_cfg_params();
@@ -102,21 +165,24 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_
struct protocol_stack_group *stack_group = get_protocol_stack_group();
dfx->loglevel = rte_log_get_level(RTE_LOGTYPE_LSTACK);
+
lstack_get_low_power_info(&dfx->low_power_info);
- memcpy_s(&dfx->data.pkts, sizeof(dfx->data.pkts), &stack->stats, sizeof(dfx->data.pkts));
+
+ memcpy_s(&dfx->data.pkts.stack_stat, sizeof(struct gazelle_stack_stat), &stack->stats,
+ sizeof(struct gazelle_stack_stat));
+
+ get_wakeup_stat(stack, &dfx->data.pkts.wakeup_stat);
+
dfx->data.pkts.call_alloc_fail = stack_group->call_alloc_fail;
int32_t rpc_call_result = rpc_call_msgcnt(stack);
dfx->data.pkts.call_msg_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result;
rpc_call_result = rpc_call_recvlistcnt(stack);
- dfx->data.pkts.recv_list = (rpc_call_result < 0) ? 0 : rpc_call_result;
-
- rpc_call_result = rpc_call_eventlistcnt(stack);
- dfx->data.pkts.event_list = (rpc_call_result < 0) ? 0 : rpc_call_result;
+ dfx->data.pkts.recv_list_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result;
rpc_call_result = rpc_call_sendlistcnt(stack);
- dfx->data.pkts.send_list = (rpc_call_result < 0) ? 0 : rpc_call_result;
+ dfx->data.pkts.send_list_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result;
dfx->data.pkts.conn_num = stack->conn_num;
}
@@ -182,6 +248,8 @@ int32_t handle_stack_cmd(int32_t fd, enum GAZELLE_STAT_MODE stat_mode)
for (uint32_t i = 0; i < stack_group->stack_num; i++) {
struct protocol_stack *stack = stack_group->stacks[i];
+
+ memset_s(&dfx, sizeof(dfx), 0, sizeof(dfx));
get_stack_dfx_data(&dfx, stack, stat_mode);
if (!use_ltran() &&
diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c
index 312e192..8937920 100644
--- a/src/lstack/core/lstack_thread_rpc.c
+++ b/src/lstack/core/lstack_thread_rpc.c
@@ -23,34 +23,53 @@
#include "lstack_dpdk.h"
#include "lstack_thread_rpc.h"
-static PER_THREAD struct rte_mempool *rpc_pool = NULL;
+#define RPC_MSG_MAX 32
+struct rpc_msg_pool {
+ struct rpc_msg msgs[RPC_MSG_MAX];
+ uint32_t prod __rte_cache_aligned;
+ uint32_t cons __rte_cache_aligned;
+};
+
+static PER_THREAD struct rpc_msg_pool *g_rpc_pool = NULL;
+
+static inline __attribute__((always_inline)) struct rpc_msg *get_rpc_msg(struct rpc_msg_pool *rpc_pool)
+{
+ uint32_t cons = __atomic_load_n(&rpc_pool->cons, __ATOMIC_ACQUIRE);
+ uint32_t prod = rpc_pool->prod + 1;
+
+ if (prod == cons) {
+ return NULL;
+ }
+
+ rpc_pool->prod = prod;
+ return &rpc_pool->msgs[prod];
+}
static inline __attribute__((always_inline))
struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func func)
{
- int32_t ret;
struct rpc_msg *msg = NULL;
if (stack == NULL) {
return NULL;
}
- static uint16_t pool_index = 0;
- if (rpc_pool == NULL) {
- rpc_pool = create_rpc_mempool("rpc_msg", atomic_fetch_add(&pool_index, 1));
- if (rpc_pool == NULL) {
+ if (g_rpc_pool == NULL) {
+ g_rpc_pool = calloc(1, sizeof(struct rpc_msg_pool));
+ if (g_rpc_pool == NULL) {
+ get_protocol_stack_group()->call_alloc_fail++;
return NULL;
}
}
- ret = rte_mempool_get(rpc_pool, (void **)&msg);
- if (ret < 0) {
+ msg = get_rpc_msg(g_rpc_pool);
+ if (msg == NULL) {
get_protocol_stack_group()->call_alloc_fail++;
return NULL;
}
- msg->pool = rpc_pool;
+ msg->pool = g_rpc_pool;
- pthread_spin_init(&msg->lock, PTHREAD_PROCESS_SHARED);
+ pthread_spin_init(&msg->lock, PTHREAD_PROCESS_PRIVATE);
msg->func = func;
msg->self_release = 1;
@@ -64,7 +83,8 @@ void rpc_msg_free(struct rpc_msg *msg)
msg->self_release = 0;
msg->func = NULL;
- rte_mempool_put(msg->pool, (void *)msg);
+
+ atomic_fetch_add(&msg->pool->cons, 1);
}
static inline __attribute__((always_inline))
@@ -109,8 +129,6 @@ void poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num)
stack->stats.call_null++;
}
- rte_mb();
-
if (msg->self_release) {
pthread_spin_unlock(&msg->lock);
} else {
@@ -192,16 +210,6 @@ int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn)
return rpc_sync_call(&stack->rpc_queue, msg);
}
-int32_t rpc_call_eventlistcnt(struct protocol_stack *stack)
-{
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_eventlist_count);
- if (msg == NULL) {
- return -1;
- }
-
- return rpc_sync_call(&stack->rpc_queue, msg);
-}
-
int32_t rpc_call_sendlistcnt(struct protocol_stack *stack)
{
struct rpc_msg *msg = rpc_msg_alloc(stack, stack_sendlist_count);
@@ -222,28 +230,6 @@ int32_t rpc_call_recvlistcnt(struct protocol_stack *stack)
return rpc_sync_call(&stack->rpc_queue, msg);
}
-void add_epoll_event(struct netconn *conn, uint32_t event);
-static void rpc_add_event(struct rpc_msg *msg)
-{
- struct lwip_sock *sock = (struct lwip_sock *)msg->args[MSG_ARG_0].p;
- if (sock->conn) {
- add_epoll_event(sock->conn, sock->events);
- }
-}
-
-void rpc_call_addevent(struct protocol_stack *stack, void *sock)
-{
- struct rpc_msg *msg = rpc_msg_alloc(stack, rpc_add_event);
- if (msg == NULL) {
- return;
- }
-
- msg->args[MSG_ARG_0].p = sock;
-
- msg->self_release = 0;
- rpc_call(&stack->rpc_queue, msg);
-}
-
int32_t rpc_call_arp(struct protocol_stack *stack, struct rte_mbuf *mbuf)
{
struct rpc_msg *msg = rpc_msg_alloc(stack, stack_arp);
@@ -260,7 +246,7 @@ int32_t rpc_call_arp(struct protocol_stack *stack, struct rte_mbuf *mbuf)
int32_t rpc_call_socket(int32_t domain, int32_t type, int32_t protocol)
{
- struct protocol_stack *stack = get_minconn_protocol_stack();
+ struct protocol_stack *stack = get_bind_protocol_stack();
struct rpc_msg *msg = rpc_msg_alloc(stack, stack_socket);
if (msg == NULL) {
return -1;
@@ -342,7 +328,12 @@ int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen)
msg->args[MSG_ARG_1].cp = addr;
msg->args[MSG_ARG_2].socklen = addrlen;
- return rpc_sync_call(&stack->rpc_queue, msg);
+ int32_t ret = rpc_sync_call(&stack->rpc_queue, msg);
+ if (ret < 0) {
+ errno = -ret;
+ return -1;
+ }
+ return ret;
}
int32_t rpc_call_getpeername(int fd, struct sockaddr *addr, socklen_t *addrlen)
diff --git a/src/lstack/include/lstack_cfg.h b/src/lstack/include/lstack_cfg.h
index 987828d..aeffbb3 100644
--- a/src/lstack/include/lstack_cfg.h
+++ b/src/lstack/include/lstack_cfg.h
@@ -75,6 +75,7 @@ struct cfg_params {
uint32_t lpm_pkts_in_detect;
bool use_ltran; // ture:lstack read from nic false:read form ltran
bool kni_switch;
+ bool listen_shadow; // true:listen in all stack thread. false:listen in one stack thread.
int dpdk_argc;
char **dpdk_argv;
struct secondary_attach_arg sec_attach_arg;
diff --git a/src/lstack/include/lstack_dpdk.h b/src/lstack/include/lstack_dpdk.h
index bb9be21..6ffcc41 100644
--- a/src/lstack/include/lstack_dpdk.h
+++ b/src/lstack/include/lstack_dpdk.h
@@ -23,7 +23,7 @@
#include "dpdk_common.h"
struct protocol_stack;
-#define RX_NB_MBUF ((5 * (MAX_CLIENTS / 4)) + (VDEV_RX_QUEUE_SZ * DEFAULT_BACKUP_RING_SIZE_FACTOR))
+#define RX_NB_MBUF ((5 * MAX_CLIENTS) + (VDEV_RX_QUEUE_SZ * DEFAULT_BACKUP_RING_SIZE_FACTOR))
#define RX_MBUF_CACHE_SZ (VDEV_RX_QUEUE_SZ)
#define TX_NB_MBUF (128 * DEFAULT_RING_SIZE)
#define TX_MBUF_CACHE_SZ (DEFAULT_RING_SIZE)
@@ -34,13 +34,13 @@ struct protocol_stack;
#define MAX_PACKET_SZ 2048
+#define RING_SIZE(x) ((x) - 1)
#define MBUF_SZ (MAX_PACKET_SZ + RTE_PKTMBUF_HEADROOM)
#define MAX_CORE_NUM 256
#define CALL_MSG_RING_SIZE (unsigned long long)32
#define CALL_CACHE_SZ 0
-#define CALL_POOL_SZ 128
/* Layout:
* | rte_mbuf | pbuf | custom_free_function | payload |
@@ -62,7 +62,6 @@ int32_t dpdk_eal_init(void);
int32_t pktmbuf_pool_init(struct protocol_stack *stack, uint16_t stack_num);
struct rte_ring *create_ring(const char *name, uint32_t count, uint32_t flags, int32_t queue_id);
int32_t create_shared_ring(struct protocol_stack *stack);
-struct rte_mempool *create_rpc_mempool(const char *name, uint16_t queue_id);
void lstack_log_level_init(void);
int dpdk_ethdev_init(void);
int dpdk_ethdev_start(void);
diff --git a/src/lstack/include/lstack_lockless_queue.h b/src/lstack/include/lstack_lockless_queue.h
index c00d3a2..c70b56a 100644
--- a/src/lstack/include/lstack_lockless_queue.h
+++ b/src/lstack/include/lstack_lockless_queue.h
@@ -1,5 +1,13 @@
/*
-* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
+* Copyright (c) Huawei Technologies Co., Ltd. 2020-2021. All rights reserved.
+* gazelle is licensed under the Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+* http://license.coscl.org.cn/MulanPSL2
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+* PURPOSE.
+* See the Mulan PSL v2 for more details.
*/
#ifndef __GAZELLE_LOCKLESS_QUEUE_H__
diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h
index c73e3a7..ba57541 100644
--- a/src/lstack/include/lstack_lwip.h
+++ b/src/lstack/include/lstack_lwip.h
@@ -14,18 +14,16 @@
#define __GAZELLE_LWIP_H__
#include "lstack_thread_rpc.h"
+#include "dpdk_common.h"
#include "lwipsock.h"
-#define SOCK_RECV_RING_SIZE (128)
-#define SOCK_SEND_RING_SIZE (32)
#define NETCONN_IS_ACCEPTIN(sock) (((sock)->conn->acceptmbox != NULL) && !sys_mbox_empty((sock)->conn->acceptmbox))
-#define NETCONN_IS_DATAIN(sock) ((rte_ring_count((sock)->recv_ring) || (sock)->recv_lastdata))
-#define NETCONN_IS_DATAOUT(sock) ((rte_ring_count((sock)->send_ring) || (sock)->send_lastdata))
-#define NETCONN_IS_OUTIDLE(sock) rte_ring_free_count((sock)->send_ring)
+#define NETCONN_IS_DATAIN(sock) ((gazelle_ring_readable_count((sock)->recv_ring) || (sock)->recv_lastdata))
+#define NETCONN_IS_DATAOUT(sock) gazelle_ring_readover_count((sock)->send_ring)
+#define NETCONN_IS_OUTIDLE(sock) gazelle_ring_readable_count((sock)->send_ring)
void create_shadow_fd(struct rpc_msg *msg);
-void listen_list_add_node(int32_t head_fd, int32_t add_fd);
void gazelle_init_sock(int32_t fd);
int32_t gazelle_socket(int domain, int type, int protocol);
void gazelle_clean_sock(int32_t fd);
@@ -37,7 +35,6 @@ void read_recv_list(struct protocol_stack *stack, uint32_t max_num);
void send_stack_list(struct protocol_stack *stack, uint32_t send_max);
void add_recv_list(int32_t fd);
void stack_sendlist_count(struct rpc_msg *msg);
-void stack_eventlist_count(struct rpc_msg *msg);
void get_lwip_conntable(struct rpc_msg *msg);
void get_lwip_connnum(struct rpc_msg *msg);
void stack_recvlist_count(struct rpc_msg *msg);
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index bc4e4bd..8a6aa9d 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -14,48 +14,52 @@
#define __GAZELLE_PROTOCOL_STACK_H__
#include <semaphore.h>
-#include <pthread.h>
#include <lwip/list.h>
#include <lwip/netif.h>
+#include <rte_common.h>
#include "dpdk_common.h"
#include "lstack_thread_rpc.h"
#include "gazelle_dfx_msg.h"
#include "lstack_lockless_queue.h"
+#define SOCK_RECV_RING_SIZE (128)
+#define SOCK_RECV_FREE_THRES (32)
+#define SOCK_SEND_RING_SIZE (32)
+#define SOCK_SEND_REPLENISH_THRES (16)
+#define WAKEUP_MAX_NUM (32)
+
struct protocol_stack {
uint32_t tid;
uint16_t queue_id;
uint16_t port_id;
uint16_t socket_id;
uint16_t cpu_id;
- volatile uint16_t conn_num;
cpu_set_t idle_cpuset; /* idle cpu in numa of stack, app thread bind to it */
+ int32_t epollfd; /* kernel event thread epoll fd */
- lockless_queue rpc_queue;
struct rte_mempool *rx_pktmbuf_pool;
struct rte_mempool *tx_pktmbuf_pool;
struct rte_ring *rx_ring;
struct rte_ring *tx_ring;
struct rte_ring *reg_ring;
struct rte_ring *wakeup_ring;
-
struct reg_ring_msg *reg_buf;
+ volatile uint16_t conn_num __rte_cache_aligned;
+ lockless_queue rpc_queue __rte_cache_aligned;
+ char pad __rte_cache_aligned;
+
struct netif netif;
+ struct eth_dev_ops *dev_ops;
uint32_t rx_ring_used;
uint32_t tx_ring_used;
- struct eth_dev_ops *dev_ops;
struct list_node recv_list;
- struct list_node listen_list;
struct list_node send_list;
- struct list_node event_list;
- pthread_spinlock_t event_lock;
- int32_t epollfd; /* kernel event thread epoll fd */
- struct gazelle_stat_pkts stats;
- struct gazelle_stack_latency latency;
struct stats_ *lwip_stats;
+ struct gazelle_stack_latency latency;
+ struct gazelle_stack_stat stats __rte_cache_aligned;
};
struct eth_params;
@@ -74,12 +78,14 @@ struct protocol_stack_group {
/* dfx stats */
bool latency_start;
uint64_t call_alloc_fail;
+ pthread_spinlock_t wakeup_list_lock;
+ struct wakeup_poll *wakeup_list __rte_cache_aligned;
};
long get_stack_tid(void);
struct protocol_stack *get_protocol_stack(void);
struct protocol_stack *get_protocol_stack_by_fd(int32_t fd);
-struct protocol_stack *get_minconn_protocol_stack(void);
+struct protocol_stack *get_bind_protocol_stack(void);
struct protocol_stack_group *get_protocol_stack_group(void);
int32_t init_protocol_stack(void);
@@ -96,6 +102,7 @@ int32_t stack_broadcast_close(int32_t fd);
/* listen sync to all protocol stack thread, so that any protocol stack thread can build connect */
int32_t stack_broadcast_listen(int32_t fd, int backlog);
+int32_t stack_single_listen(int32_t fd, int32_t backlog);
/* ergodic the protocol stack thread to find the connection, because all threads are listening */
int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *addrlen);
diff --git a/src/lstack/include/lstack_stack_stat.h b/src/lstack/include/lstack_stack_stat.h
index 2c3bf8f..e152fe6 100644
--- a/src/lstack/include/lstack_stack_stat.h
+++ b/src/lstack/include/lstack_stack_stat.h
@@ -24,4 +24,7 @@ void stack_stat_init(void);
int32_t handle_stack_cmd(int fd, enum GAZELLE_STAT_MODE stat_mode);
uint64_t get_current_time(void);
void lstack_get_low_power_info(struct gazelle_stat_low_power_info *low_power_info);
+void register_wakeup(struct wakeup_poll *wakeup);
+void unregister_wakeup(struct wakeup_poll *wakeup);
+
#endif /* GAZELLE_STACK_STAT_H */
diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h
index 61bcd38..35e6b1e 100644
--- a/src/lstack/include/lstack_thread_rpc.h
+++ b/src/lstack/include/lstack_thread_rpc.h
@@ -37,12 +37,13 @@ union rpc_msg_arg {
socklen_t socklen;
size_t size;
};
+struct rpc_msg_pool;
struct rpc_msg {
pthread_spinlock_t lock; /* msg handler unlock notice sender msg process done */
int32_t self_release; /* 0:msg handler release msg 1:msg sender release msg */
int64_t result; /* func return val */
lockless_queue_node queue_node;
- struct rte_mempool *pool;
+ struct rpc_msg_pool *pool;
rpc_msg_func func; /* msg handle func hook */
union rpc_msg_arg args[RPM_MSG_ARG_SIZE]; /* resolve by type */
@@ -50,11 +51,9 @@ struct rpc_msg {
struct protocol_stack;
void poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num);
-void rpc_call_addevent(struct protocol_stack *stack, void *sock);
int32_t rpc_call_msgcnt(struct protocol_stack *stack);
int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen);
int32_t rpc_call_recvlistcnt(struct protocol_stack *stack);
-int32_t rpc_call_eventlistcnt(struct protocol_stack *stack);
int32_t rpc_call_sendlistcnt(struct protocol_stack *stack);
int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn);
int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn);
diff --git a/src/lstack/include/posix/lstack_epoll.h b/src/lstack/include/posix/lstack_epoll.h
index a83f41f..e9f9b91 100644
--- a/src/lstack/include/posix/lstack_epoll.h
+++ b/src/lstack/include/posix/lstack_epoll.h
@@ -20,16 +20,27 @@ extern "C" {
#include <poll.h>
#include <stdbool.h>
#include <semaphore.h>
+#include <pthread.h>
+#include <rte_common.h>
#include "lstack_protocol_stack.h"
+enum wakeup_type {
+ WAKEUP_EPOLL = 0,
+ WAKEUP_POLL,
+};
struct wakeup_poll {
+ /* stack thread read frequently */
+ sem_t event_sem __rte_cache_aligned;
+ enum wakeup_type type __rte_cache_aligned;
+ volatile bool have_kernel_event __rte_cache_aligned;
+ struct gazelle_wakeup_stat stat __rte_cache_aligned;
+ char pad __rte_cache_aligned;
+
bool init;
struct protocol_stack *bind_stack;
- sem_t event_sem;
-
- int32_t epollfd;
- bool have_kernel_fd;
+ int32_t epollfd; /* epoll kernel fd, ctl add into gazelle_kernel_event thread */
+ struct wakeup_poll *next;
/* poll */
struct pollfd *last_fds;
@@ -40,7 +51,8 @@ struct wakeup_poll {
/* epoll */
int32_t stack_fd_cnt[PROTOCOL_STACK_MAX];
struct protocol_stack *max_stack;
- struct list_node event_list; /* epoll temp use */
+ struct list_node event_list;
+ pthread_spinlock_t event_list_lock;
};
int32_t lstack_epoll_create(int32_t size);
diff --git a/src/lstack/lstack.conf b/src/lstack/lstack.conf
index 696dfb9..fdca602 100644
--- a/src/lstack/lstack.conf
+++ b/src/lstack/lstack.conf
@@ -16,6 +16,7 @@ kni_switch=0
low_power_mode=0
num_cpus="2"
+num_wakeup="3"
host_addr="192.168.1.10"
mask_addr="255.255.255.0"
diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c
index 382f3bc..7938520 100644
--- a/src/lstack/netif/lstack_ethdev.c
+++ b/src/lstack/netif/lstack_ethdev.c
@@ -39,10 +39,11 @@ void eth_dev_recv(struct rte_mbuf *mbuf)
struct pbuf_custom *pc = NULL;
struct protocol_stack *stack = get_protocol_stack();
struct rte_mbuf *m = mbuf;
- uint16_t len;
+ uint16_t len, pkt_len;
+ pkt_len = (uint16_t)rte_pktmbuf_pkt_len(m);
while (m != NULL) {
- len = (uint16_t)rte_pktmbuf_pkt_len(m);
+ len = (uint16_t)rte_pktmbuf_data_len(m);
payload = rte_pktmbuf_mtod(m, void *);
pc = mbuf_to_pbuf(m);
pc->custom_free_function = gazelle_free_pbuf;
@@ -51,6 +52,7 @@ void eth_dev_recv(struct rte_mbuf *mbuf)
stack->stats.rx_allocmbuf_fail++;
break;
}
+ next->tot_len = pkt_len;
#if CHECKSUM_CHECK_IP_HW || CHECKSUM_CHECK_TCP_HW
next->ol_flags = m->ol_flags;
#endif
@@ -71,7 +73,6 @@ void eth_dev_recv(struct rte_mbuf *mbuf)
if (ret != ERR_OK) {
LSTACK_LOG(ERR, LSTACK, "eth_dev_recv: failed to handle rx pbuf ret=%d\n", ret);
stack->stats.rx_drop++;
- pbuf_free(head);
}
}
}
@@ -181,7 +182,7 @@ int32_t ethdev_init(struct protocol_stack *stack)
if (use_ltran()) {
stack->rx_ring_used = 0;
- int32_t ret = fill_mbuf_to_ring(stack->rx_pktmbuf_pool, stack->rx_ring, VDEV_RX_QUEUE_SZ - 1);
+ int32_t ret = fill_mbuf_to_ring(stack->rx_pktmbuf_pool, stack->rx_ring, RING_SIZE(VDEV_RX_QUEUE_SZ));
if (ret != 0) {
LSTACK_LOG(ERR, LSTACK, "fill mbuf to rx_ring failed ret=%d\n", ret);
return ret;
diff --git a/src/lstack/netif/lstack_vdev.c b/src/lstack/netif/lstack_vdev.c
index 5a4e86a..287ac8f 100644
--- a/src/lstack/netif/lstack_vdev.c
+++ b/src/lstack/netif/lstack_vdev.c
@@ -42,14 +42,14 @@ static uint32_t ltran_rx_poll(struct protocol_stack *stack, struct rte_mbuf **pk
uint32_t nr_pkts;
struct rte_mbuf *free_buf[DPDK_PKT_BURST_SIZE];
- rcvd_pkts = rte_ring_en_dequeue_burst(stack->rx_ring, (void **)pkts, max_mbuf);
+ rcvd_pkts = gazelle_ring_sc_dequeue(stack->rx_ring, (void **)pkts, max_mbuf);
stack->rx_ring_used += rcvd_pkts;
if (unlikely(stack->rx_ring_used >= USED_RX_PKTS_WATERMARK)) {
- uint32_t free_cnt = LWIP_MIN(stack->rx_ring_used, DPDK_PKT_BURST_SIZE);
+ uint32_t free_cnt = LWIP_MIN(stack->rx_ring_used, RING_SIZE(DPDK_PKT_BURST_SIZE));
int32_t ret = gazelle_alloc_pktmbuf(stack->rx_pktmbuf_pool, (struct rte_mbuf **)free_buf, free_cnt);
if (likely(ret == 0)) {
- nr_pkts = rte_ring_en_enqueue_bulk(stack->rx_ring, (void **)free_buf, free_cnt);
+ nr_pkts = gazelle_ring_sp_enqueue(stack->rx_ring, (void **)free_buf, free_cnt);
stack->rx_ring_used -= nr_pkts;
} else {
stack->stats.rx_allocmbuf_fail++;
@@ -72,14 +72,14 @@ static uint32_t ltran_tx_xmit(struct protocol_stack *stack, struct rte_mbuf **pk
do {
if (unlikely(stack->tx_ring_used >= INUSE_TX_PKTS_WATERMARK)) {
- uint32_t free_pkts = rte_ring_en_dequeue_burst(stack->tx_ring, (void **)free_buf, stack->tx_ring_used);
+ uint32_t free_pkts = gazelle_ring_sc_dequeue(stack->tx_ring, (void **)free_buf, stack->tx_ring_used);
for (uint32_t i = 0; i < free_pkts; i++) {
rte_pktmbuf_free(free_buf[i]);
}
stack->tx_ring_used -= free_pkts;
}
- sent_pkts += rte_ring_en_enqueue_bulk(stack->tx_ring, (void **)(&pkts[sent_pkts]), nr_pkts - sent_pkts);
+ sent_pkts += gazelle_ring_sp_enqueue(stack->tx_ring, (void **)(&pkts[sent_pkts]), nr_pkts - sent_pkts);
} while ((sent_pkts < nr_pkts) && (ENQUEUE_RING_RETRY_TIMEOUT > sys_now() - tbegin) && get_register_state());
stack->tx_ring_used += sent_pkts;
@@ -128,7 +128,7 @@ int32_t vdev_reg_xmit(enum reg_ring_type type, struct gazelle_quintuple *qtuple)
}
do {
- (void)rte_ring_en_dequeue_burst(stack->reg_ring, free_buf, VDEV_REG_QUEUE_SZ);
+ (void)gazelle_ring_sc_dequeue(stack->reg_ring, free_buf, VDEV_REG_QUEUE_SZ);
if (get_reg_ring_free_count(stack->reg_ring) == 0) {
continue;
@@ -144,7 +144,7 @@ int32_t vdev_reg_xmit(enum reg_ring_type type, struct gazelle_quintuple *qtuple)
}
free_buf[0] = tmp_buf;
- sent_pkts = rte_ring_en_enqueue_bulk(stack->reg_ring, free_buf, 1);
+ sent_pkts = gazelle_ring_sp_enqueue(stack->reg_ring, free_buf, 1);
} while ((sent_pkts < 1) && (ENQUEUE_RING_RETRY_TIMEOUT > sys_now() - tbegin) && get_register_state());
if (sent_pkts == 1) {
diff --git a/src/ltran/ltran_dfx.c b/src/ltran/ltran_dfx.c
index 8d71966..7db1adc 100644
--- a/src/ltran/ltran_dfx.c
+++ b/src/ltran/ltran_dfx.c
@@ -546,32 +546,28 @@ static void gazelle_print_lstack_stat_brief(struct gazelle_stat_lstack_total *st
static void show_lstack_stats(struct gazelle_stack_dfx_data *lstack_stat)
{
printf("\n------ stack tid: %6u ------\n", lstack_stat->tid);
- printf("rx_pkts: %-20"PRIu64" ", lstack_stat->data.pkts.rx);
- printf("rx_drop: %-20"PRIu64" ", lstack_stat->data.pkts.rx_drop);
- printf("rx_allocmbuf_fail: %-10"PRIu64"\n", lstack_stat->data.pkts.rx_allocmbuf_fail);
- printf("tx_pkts: %-20"PRIu64" ", lstack_stat->data.pkts.tx);
- printf("tx_drop: %-20"PRIu64" ", lstack_stat->data.pkts.tx_drop);
- printf("tx_allocmbuf_fail: %-10"PRIu64"\n", lstack_stat->data.pkts.tx_allocmbuf_fail);
- printf("app_read: %-19"PRIu64" ", lstack_stat->data.pkts.app_read_cnt);
- printf("read_lwip: %-18"PRIu64" ", lstack_stat->data.pkts.read_lwip_cnt);
- printf("read_lwip_drop: %-13"PRIu64" \n", lstack_stat->data.pkts.read_lwip_drop);
- printf("app_write: %-18"PRIu64" ", lstack_stat->data.pkts.app_write_cnt);
- printf("write_lwip: %-17"PRIu64" ", lstack_stat->data.pkts.write_lwip_cnt);
- printf("app_get_idlefail: %-11"PRIu64" \n", lstack_stat->data.pkts.app_write_idlefail);
- printf("app_write_drop: %-13"PRIu64" ", lstack_stat->data.pkts.app_write_drop);
- printf("write_lwip_drop: %-12"PRIu64" ", lstack_stat->data.pkts.write_lwip_drop);
- printf("app_write_idlebuf: %-10"PRIu16" \n", lstack_stat->data.pkts.send_idle_ring_cnt);
- printf("event_list: %-17"PRIu64" ", lstack_stat->data.pkts.event_list);
- printf("recv_list: %-18"PRIu64" ", lstack_stat->data.pkts.recv_list);
+ printf("rx_pkts: %-20"PRIu64" ", lstack_stat->data.pkts.stack_stat.rx);
+ printf("rx_drop: %-20"PRIu64" ", lstack_stat->data.pkts.stack_stat.rx_drop);
+ printf("rx_allocmbuf_fail: %-10"PRIu64"\n", lstack_stat->data.pkts.stack_stat.rx_allocmbuf_fail);
+ printf("tx_pkts: %-20"PRIu64" ", lstack_stat->data.pkts.stack_stat.tx);
+ printf("tx_drop: %-20"PRIu64" ", lstack_stat->data.pkts.stack_stat.tx_drop);
+ printf("tx_allocmbuf_fail: %-10"PRIu64"\n", lstack_stat->data.pkts.stack_stat.tx_allocmbuf_fail);
+ printf("app_read: %-19"PRIu64" ", lstack_stat->data.pkts.wakeup_stat.app_read_cnt);
+ printf("read_lwip: %-18"PRIu64" ", lstack_stat->data.pkts.stack_stat.read_lwip_cnt);
+ printf("read_lwip_drop: %-13"PRIu64" \n", lstack_stat->data.pkts.stack_stat.read_lwip_drop);
+ printf("app_write: %-18"PRIu64" ", lstack_stat->data.pkts.wakeup_stat.app_write_cnt);
+ printf("write_lwip: %-17"PRIu64" ", lstack_stat->data.pkts.stack_stat.write_lwip_cnt);
+ printf("app_get_idlefail: %-11"PRIu64" \n", lstack_stat->data.pkts.wakeup_stat.app_write_idlefail);
+ printf("recv_list: %-18"PRIu64" ", lstack_stat->data.pkts.recv_list_cnt);
+ printf("send_list: %-18"PRIu64" ", lstack_stat->data.pkts.send_list_cnt);
printf("conn_num: %-19"PRIu16" \n", lstack_stat->data.pkts.conn_num);
- printf("wakeup_events: %-14"PRIu64" ", lstack_stat->data.pkts.wakeup_events);
- printf("app_events: %-17"PRIu64" ", lstack_stat->data.pkts.app_events);
- printf("read_null: %-18"PRIu64" \n", lstack_stat->data.pkts.read_null);
+ printf("wakeup_events: %-14"PRIu64" ", lstack_stat->data.pkts.stack_stat.wakeup_events);
+ printf("app_events: %-17"PRIu64" ", lstack_stat->data.pkts.wakeup_stat.app_events);
+ printf("read_null: %-18"PRIu64" \n", lstack_stat->data.pkts.wakeup_stat.read_null);
printf("call_msg: %-19"PRIu64" ", lstack_stat->data.pkts.call_msg_cnt);
printf("call_alloc_fail: %-12"PRIu64" ", lstack_stat->data.pkts.call_alloc_fail);
- printf("call_null: %-18"PRIu64" \n", lstack_stat->data.pkts.call_null);
- printf("send_self_rpc: %-14"PRIu64" ", lstack_stat->data.pkts.send_self_rpc);
- printf("send_list: %-18"PRIu64" \n", lstack_stat->data.pkts.send_list);
+ printf("call_null: %-18"PRIu64" \n", lstack_stat->data.pkts.stack_stat.call_null);
+ printf("send_self_rpc: %-14"PRIu64" \n", lstack_stat->data.pkts.stack_stat.send_self_rpc);
}
static void gazelle_print_lstack_stat_detail(struct gazelle_stack_dfx_data *lstack_stat,
@@ -873,8 +869,8 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_
printf("Active Internet connections (servers and established)\n");
do {
printf("\n------ stack tid: %6u ------\n", stat->tid);
- printf("No. Proto recv_cnt recv_ring in_send send_ring sem_cnt Local Address "
- " Foreign Address State\n");
+ printf("No. Proto recv_cnt recv_ring in_send send_ring sem_cnt fd Local Address "
+ " Foreign Address State\n");
uint32_t unread_pkts = 0;
uint32_t unsend_pkts = 0;
for (i = 0; i < conn->conn_num && i < GAZELLE_LSTACK_MAX_CONN; i++) {
@@ -883,13 +879,13 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_
rip.s_addr = conn_info->rip;
lip.s_addr = conn_info->lip;
if ((conn_info->state == GAZELLE_ACTIVE_LIST) || (conn_info->state == GAZELLE_TIME_WAIT_LIST)) {
- printf("%-6utcp %-10u%-11u%-9u%-11u%-9d%s:%hu\t%s:%hu\t%s\n", i, conn_info->recv_cnt,
+ printf("%-6utcp %-10u%-11u%-9u%-11u%-9d%-7d%s:%hu\t %s:%hu\t %s\n", i, conn_info->recv_cnt,
conn_info->recv_ring_cnt, conn_info->in_send, conn_info->send_ring_cnt, conn_info->sem_cnt,
- inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port,
+ conn_info->fd, inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port,
inet_ntop(AF_INET, &rip, str_rip, sizeof(str_rip)), conn_info->r_port,
tcp_state_to_str(conn_info->tcp_sub_state));
} else if (conn_info->state == GAZELLE_LISTEN_LIST) {
- printf("%-6utcp %-50u%s:%hu\t0.0.0.0:*\t\tLISTEN\n", i, conn_info->recv_cnt,
+ printf("%-6utcp %-57u%s:%hu\t 0.0.0.0:*\t\t LISTEN\n", i, conn_info->recv_cnt,
inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port);
} else {
printf("Got unknow tcp conn::%s:%5hu, state:%u\n",
diff --git a/src/ltran/ltran_forward.c b/src/ltran/ltran_forward.c
index b264ad3..776692d 100644
--- a/src/ltran/ltran_forward.c
+++ b/src/ltran/ltran_forward.c
@@ -92,7 +92,7 @@ static __rte_always_inline void backup_bufs_enque_rx_ring(struct gazelle_stack *
struct rte_mbuf *free_buf[RING_MAX_SIZE];
flush_cnt = (stack->backup_pkt_cnt < RING_MAX_SIZE) ? stack->backup_pkt_cnt : RING_MAX_SIZE;
- free_cnt = rte_ring_cn_dequeue_burst(stack->rx_ring, (void **)free_buf, flush_cnt);
+ free_cnt = gazelle_ring_read(stack->rx_ring, (void **)free_buf, flush_cnt);
for (uint32_t j = 0; j < free_cnt; j++) {
index = (stack->backup_start + j) % backup_size;
@@ -102,7 +102,7 @@ static __rte_always_inline void backup_bufs_enque_rx_ring(struct gazelle_stack *
stack->stack_stats.rx += free_cnt;
stack->backup_pkt_cnt -= free_cnt;
stack->backup_start = (stack->backup_start + free_cnt) % backup_size;
- rte_ring_cn_enqueue(stack->rx_ring);
+ gazelle_ring_read_over(stack->rx_ring);
}
static __rte_always_inline void pktbufs_move_to_backup_bufs(struct gazelle_stack *stack, struct rte_mbuf **mbuf,
@@ -135,7 +135,7 @@ static __rte_always_inline uint32_t pkt_bufs_enque_rx_ring(struct gazelle_stack
struct rte_mbuf **cl_buffer = stack->pkt_buf;
struct rte_mbuf *free_buf[GAZELLE_PACKET_READ_SIZE];
- free_cnt = rte_ring_cn_dequeue_burst(stack->rx_ring, (void **)free_buf, stack->pkt_cnt);
+ free_cnt = gazelle_ring_read(stack->rx_ring, (void **)free_buf, stack->pkt_cnt);
stack->stack_stats.rx += free_cnt;
/* this prefetch and copy code, only 50~60 instruction, but never spend less than 70 cycle.
@@ -187,7 +187,7 @@ static __rte_always_inline uint32_t pkt_bufs_enque_rx_ring(struct gazelle_stack
}
if (likely(free_cnt != 0)) {
- rte_ring_cn_enqueue(stack->rx_ring);
+ gazelle_ring_read_over(stack->rx_ring);
}
return free_cnt;
@@ -520,14 +520,14 @@ static __rte_always_inline void tcp_hash_table_handle(struct gazelle_stack *stac
return;
}
- uint32_t num = rte_ring_cn_dequeue_burst(stack->reg_ring, pkts, PACKET_READ_SIZE);
+ uint32_t num = gazelle_ring_read(stack->reg_ring, pkts, PACKET_READ_SIZE);
for (uint32_t i = 0; i < num; i++) {
tcp_hash_table_modify(stack, pkts[i]);
pkts[i] = NULL;
}
- rte_ring_cn_enqueue(stack->reg_ring);
+ gazelle_ring_read_over(stack->reg_ring);
if (pthread_mutex_unlock(&sock_htable->mlock) != 0) {
LTRAN_WARN("write tcp_htable: unlock failed, errno %d\n", errno);
}
@@ -675,7 +675,7 @@ static __rte_always_inline void downstream_forward_one(struct gazelle_stack *sta
uint32_t used_cnt;
struct rte_mbuf *used_pkts[GAZELLE_PACKET_READ_SIZE];
- used_cnt = rte_ring_cn_dequeue_burst(stack->tx_ring, (void **)used_pkts, GAZELLE_PACKET_READ_SIZE);
+ used_cnt = gazelle_ring_read(stack->tx_ring, (void **)used_pkts, GAZELLE_PACKET_READ_SIZE);
if (used_cnt == 0) {
return;
}
@@ -686,7 +686,7 @@ static __rte_always_inline void downstream_forward_one(struct gazelle_stack *sta
if (ret != 0) {
/* free pkts that not have be sent. */
LTRAN_ERR("down alloc error, rx_pkts:%u ret=%d.\n", used_cnt, ret);
- rte_ring_cn_enqueue(stack->tx_ring);
+ gazelle_ring_read_over(stack->tx_ring);
stack->stack_stats.tx_drop += used_cnt;
rte_exit(EXIT_FAILURE, "down alloc error\n");
}
@@ -696,7 +696,7 @@ static __rte_always_inline void downstream_forward_one(struct gazelle_stack *sta
tx_bytes += used_pkts[tx_pkts]->data_len;
stack->stack_stats.tx_bytes += used_pkts[tx_pkts]->data_len;
}
- rte_ring_cn_enqueue(stack->tx_ring);
+ gazelle_ring_read_over(stack->tx_ring);
/* send packets anyway. */
tx_pkts = 0;
diff --git a/src/ltran/ltran_stat.c b/src/ltran/ltran_stat.c
index 7080424..c6805a6 100644
--- a/src/ltran/ltran_stat.c
+++ b/src/ltran/ltran_stat.c
@@ -25,6 +25,7 @@
#include "gazelle_dfx_msg.h"
#include "ltran_timer.h"
#include "ltran_ethdev.h"
+#include "dpdk_common.h"
#include "ltran_forward.h"
static uint64_t g_start_time_stamp = 0;
@@ -32,25 +33,11 @@ static int32_t g_start_latency = GAZELLE_OFF;
volatile int32_t g_ltran_stop_flag = GAZELLE_FALSE;
static struct statistics g_statistics;
-static uint32_t get_rx_ring_count(const struct rte_ring *r)
-{
- return rte_ring_count(r);
-}
-
uint64_t get_start_time_stamp(void)
{
return g_start_time_stamp;
}
-static uint32_t get_tx_ring_count(const struct rte_ring *r)
-{
- uint32_t prod_tail = r->prod.tail;
- uint32_t cons_head = r->cons.head;
-
- uint32_t count = (cons_head - prod_tail) & r->mask;
- return (count > r->capacity) ? r->capacity : count;
-}
-
void set_start_latency_flag(int32_t flag)
{
struct gazelle_instance_mgr *instance_mgr = get_instance_mgr();
@@ -203,8 +190,8 @@ static int32_t gazelle_filling_lstack_stat_total(struct gazelle_stat_lstack_tota
stat->latency_pkts = stack->stack_stats.latency_pkts;
stat->latency_total = stack->stack_stats.latency_total;
stat->reg_ring_cnt = rte_ring_cn_count(stack->reg_ring);
- stat->rx_ring_cnt = get_rx_ring_count(stack->rx_ring);
- stat->tx_ring_cnt = get_tx_ring_count(stack->tx_ring);
+ stat->rx_ring_cnt = gazelle_ring_readover_count(stack->rx_ring);
+ stat->tx_ring_cnt = gazelle_ring_readable_count(stack->tx_ring);
return GAZELLE_OK;
}
--
2.23.0
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。