代码拉取完成,页面将自动刷新
同步操作将从 src-openEuler/dpu-utilities 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
From 435916d197bfce059e3afbca0975315fc5b1662f Mon Sep 17 00:00:00 2001
From: yangxin <245051644@qq.com>
Date: Fri, 10 Feb 2023 17:02:05 +0800
Subject: [PATCH 4/5] Add udsproxy.
Signed-off-by: yangxin <245051644@qq.com>
---
qtfs/README.md | 7 +-
qtfs/comm.h | 8 +-
qtfs/conn.c | 9 +-
qtfs/conn.h | 1 +
qtfs/ipc/Makefile | 22 +
qtfs/ipc/uds_connector.c | 129 +++++
qtfs/ipc/uds_event.c | 999 +++++++++++++++++++++++++++++++++
qtfs/ipc/uds_event.h | 64 +++
qtfs/ipc/uds_main.c | 556 ++++++++++++++++++
qtfs/ipc/uds_main.h | 141 +++++
qtfs/ipc/uds_module.h | 19 +
qtfs/misc.c | 7 +
qtfs/qtfs/sb.c | 6 +-
qtfs/qtfs_server/Makefile | 17 +-
qtfs/qtfs_server/fsops.c | 5 +-
qtfs/qtfs_server/qtfs-server.c | 11 +-
qtfs/qtfs_server/user_engine.c | 26 +-
qtfs/qtinfo/qtinfo.c | 77 ++-
qtfs/qtsock.c | 332 +++++++++++
19 files changed, 2406 insertions(+), 30 deletions(-)
create mode 100644 qtfs/ipc/Makefile
create mode 100644 qtfs/ipc/uds_connector.c
create mode 100644 qtfs/ipc/uds_event.c
create mode 100644 qtfs/ipc/uds_event.h
create mode 100644 qtfs/ipc/uds_main.c
create mode 100644 qtfs/ipc/uds_main.h
create mode 100644 qtfs/ipc/uds_module.h
create mode 100644 qtfs/qtsock.c
diff --git a/qtfs/README.md b/qtfs/README.md
index 0cbc2e1..19987e0 100644
--- a/qtfs/README.md
+++ b/qtfs/README.md
@@ -24,6 +24,7 @@ qtfs的特性:
## 安装教程
目录说明:
++ **ipc**: 跨主机unix domain socket协同组件,在该目录下编译udsproxyd二进制和libudsproxy.so库。
+ **qtfs**: 客户端内核模块相关代码,直接在该目录下编译客户端ko。
+ **qtfs_server**: 服务端内核模块相关代码,直接在该目录下编译服务端ko和相关程序。
+ **qtinfo**: 诊断工具,支持查询文件系统的工作状态以及修改log级别等。
@@ -34,19 +35,23 @@ qtfs的特性:
1. 要求内核版本在5.10或更高版本。
2. 安装内核开发包:yum install kernel-devel。
+ 3. 假设host服务器ip为192.168.10.10,dpu为192.168.10.11
服务端安装:
1. cd qtfs_server
2. make clean && make
3. insmod qtfs_server.ko qtfs_server_ip=x.x.x.x qtfs_server_port=12345 qtfs_log_level=WARN
- 4. ./engine 4096 16
+ 4. nohup ./engine 16 1 192.168.10.10 12121 192.168.10.11 12121 2>&1 &
客户端安装:
1. cd qtfs
2. make clean && make
3. insmod qtfs.ko qtfs_server_ip=x.x.x.x qtfs_server_port=12345 qtfs_log_level=WARN
+ 4. cd ../ipc/
+ 5. make clean && make && make install
+ 6. nohup udsproxyd 1 192.168.10.11 12121 192.168.10.10 12121 2>&1 &
## 使用说明
diff --git a/qtfs/comm.h b/qtfs/comm.h
index 901552c..2e562bb 100644
--- a/qtfs/comm.h
+++ b/qtfs/comm.h
@@ -3,6 +3,9 @@
extern struct qtinfo *qtfs_diag_info;
+#define QTFS_CLIENT_DEV "/dev/qtfs_client"
+#define QTFS_SERVER_DEV "/dev/qtfs_server"
+
#define QTFS_IOCTL_MAGIC 'Q'
enum {
_QTFS_IOCTL_EXEC,
@@ -18,6 +21,7 @@ enum {
_QTFS_IOCTL_LOG_LEVEL,
_QTFS_IOCTL_EPOLL_SUPPORT,
+ _QTFS_IOCTL_UDS_PROXY_PID,
};
#define QTFS_IOCTL_THREAD_INIT _IO(QTFS_IOCTL_MAGIC, _QTFS_IOCTL_EXEC)
@@ -31,6 +35,7 @@ enum {
#define QTFS_IOCTL_CLEARALL _IO(QTFS_IOCTL_MAGIC, _QTFS_IOCTL_CLEARALL)
#define QTFS_IOCTL_LOGLEVEL _IO(QTFS_IOCTL_MAGIC, _QTFS_IOCTL_LOG_LEVEL)
#define QTFS_IOCTL_EPOLL_SUPPORT _IO(QTFS_IOCTL_MAGIC, _QTFS_IOCTL_EPOLL_SUPPORT)
+#define QTFS_IOCTL_UDS_PROXY_PID _IO(QTFS_IOCTL_MAGIC, _QTFS_IOCTL_UDS_PROXY_PID)
#define QTINFO_MAX_EVENT_TYPE 36 // look qtreq_type at req.h
#define QTFS_FUNCTION_LEN 64
@@ -119,6 +124,7 @@ enum qtinfo_cnts {
};
#endif
+#if (defined(QTFS_CLIENT) || defined(client) || defined(QTFS_SERVER) || defined(server))
// for connection state machine
typedef enum {
QTCONN_INIT,
@@ -159,7 +165,7 @@ struct qtinfo {
#define QTINFO_STATE(state) ((state == QTCONN_INIT) ? "INIT" : \
((state == QTCONN_CONNECTING) ? "CONNECTING" : \
((state == QTCONN_ACTIVE) ? "ACTIVE" : "UNKNOWN")))
-
+#endif
//ko compile
#if (defined(QTFS_CLIENT) || defined(client))
static inline void qtinfo_clear(void)
diff --git a/qtfs/conn.c b/qtfs/conn.c
index 26930b1..c84c85c 100644
--- a/qtfs/conn.c
+++ b/qtfs/conn.c
@@ -32,6 +32,7 @@ struct qtfs_sock_var_s *qtfs_epoll_var = NULL;
struct socket *qtfs_server_main_sock = NULL;
struct qtfs_server_userp_s *qtfs_userps = NULL;
#endif
+int qtfs_uds_proxy_pid = -1;
#define QTFS_EPOLL_THREADIDX (QTFS_MAX_THREADS + 4)
@@ -76,6 +77,10 @@ static int qtfs_conn_sockserver_init(struct qtfs_sock_var_s *pvar)
{
struct socket *sock;
int ret;
+ struct sockaddr_in saddr;
+ saddr.sin_family = AF_INET;
+ saddr.sin_port = htons(pvar->port);
+ saddr.sin_addr.s_addr = in_aton(pvar->addr);
if (!QTCONN_IS_EPOLL_CONN(pvar) && qtfs_server_main_sock != NULL) {
qtfs_info("qtfs server main sock is %lx, valid or out-of-date?", (unsigned long)qtfs_server_main_sock);
@@ -147,10 +152,6 @@ static int qtfs_conn_sockclient_init(struct qtfs_sock_var_s *pvar)
{
struct socket *sock;
int ret;
- struct sockaddr_in saddr;
- saddr.sin_family = AF_INET;
- saddr.sin_port = htons(pvar->port);
- saddr.sin_addr.s_addr = in_aton(pvar->addr);
ret = sock_create_kern(&init_net, AF_INET, SOCK_STREAM, 0, &sock);
if (ret) {
diff --git a/qtfs/conn.h b/qtfs/conn.h
index db590fc..742def4 100644
--- a/qtfs/conn.h
+++ b/qtfs/conn.h
@@ -26,6 +26,7 @@ extern char qtfs_log_level[QTFS_LOGLEVEL_STRLEN];
extern int log_level;
extern struct qtinfo *qtfs_diag_info;
extern bool qtfs_epoll_mode;
+extern int qtfs_uds_proxy_pid;
#define qtfs_conn_get_param(void) _qtfs_conn_get_param(__func__)
diff --git a/qtfs/ipc/Makefile b/qtfs/ipc/Makefile
new file mode 100644
index 0000000..47e74ad
--- /dev/null
+++ b/qtfs/ipc/Makefile
@@ -0,0 +1,22 @@
+all: udsproxyd libudsproxy.so
+
+udsproxyd: uds_event.o uds_main.o
+ gcc -g -O2 -o udsproxyd $^ -I../
+
+uds_event.o:
+ cc -g -c -o uds_event.o uds_event.c
+
+uds_main.o:
+ cc -g -c -o uds_main.o uds_main.c
+
+libudsproxy.so:
+ gcc -g -O2 -o libudsproxy.so uds_connector.c -fPIC --shared
+
+install:
+ yes | cp udsproxyd /usr/bin/
+ yes | cp libudsproxy.so /usr/lib64/
+
+clean:
+ @rm -rf *.o udsproxyd libudsproxy.so
+
+.PHONY: clean
diff --git a/qtfs/ipc/uds_connector.c b/qtfs/ipc/uds_connector.c
new file mode 100644
index 0000000..3c46ce5
--- /dev/null
+++ b/qtfs/ipc/uds_connector.c
@@ -0,0 +1,129 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <unistd.h>
+#include <signal.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <time.h>
+#include <dlfcn.h>
+#include <sys/types.h>
+
+#include "uds_module.h"
+
+#define uds_log(info, ...) \
+ do { \
+ time_t t; \
+ struct tm *p; \
+ time(&t); \
+ p = localtime(&t); \
+ printf("[%d/%02d/%02d %02d:%02d:%02d][LOG:%s:%3d]"info"\n", \
+ p->tm_year + 1900, p->tm_mon+1, p->tm_mday, \
+ p->tm_hour, p->tm_min, p->tm_sec, __func__, __LINE__, ##__VA_ARGS__); \
+ } while (0);
+
+#define uds_err(info, ...) \
+ do { \
+ time_t t; \
+ struct tm *p; \
+ time(&t); \
+ p = localtime(&t); \
+ printf("[%d/%02d/%02d %02d:%02d:%02d][LOG:%s:%3d]"info"\n", \
+ p->tm_year + 1900, p->tm_mon+1, p->tm_mday, \
+ p->tm_hour, p->tm_min, p->tm_sec, __func__, __LINE__, ##__VA_ARGS__); \
+ } while (0);
+
+static unsigned short uds_conn_get_sock_type(int sockfd)
+{
+ unsigned short type;
+ int len = 2;
+ int ret = getsockopt(sockfd, SOL_SOCKET, SO_TYPE, &type, &len);
+ if (ret < 0) {
+ uds_err("get sock type failed, fd:%d", sockfd);
+ return (unsigned short)-1;
+ }
+ uds_log("fd:%d type:%d", sockfd, type);
+ return type;
+}
+
+static int uds_conn_whitelist_check(const char *path)
+{
+ return 1;
+}
+
+int connect(int fd, const struct sockaddr *addrarg, socklen_t len)
+{
+ int sock_fd;
+ typeof(connect) *libcconnect = NULL;
+ int libcret;
+ const struct sockaddr_un *addr = (const struct sockaddr_un *)addrarg;
+
+ if (libcconnect == NULL) {
+ libcconnect = dlsym(((void *) - 1l), "connect");
+ if (libcconnect == NULL) {
+ uds_err("can't find connect by dlsym.");
+ return -1;
+ }
+ }
+
+ libcret = (*libcconnect)(fd, addrarg, len);
+ if (libcret == 0 || addr->sun_family != AF_UNIX) {
+ // 如果本地connect成功,或者非UNIX DOMAIN SOCKET,都直接返回即可
+ return libcret;
+ }
+
+ uds_log("enter uds connect fd:%d sunpath:%s family:%d len:%d connect function:0x%lx", fd, addr->sun_path,
+ addr->sun_family, len, libcconnect);
+ // 本地未连接,且是uds链接
+ if (!uds_conn_whitelist_check(addr->sun_path)) {
+ uds_err("path:%s not in white list", addr->sun_path);
+ return libcret;
+ }
+
+ // 尝试远端链接
+ do {
+ int ret;
+ struct uds_proxy_remote_conn_req remoteconn;
+ struct uds_proxy_remote_conn_rsp remotersp;
+ struct sockaddr_un proxy = {.sun_family = AF_UNIX};
+ sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (sock_fd < 0) {
+ uds_err("create socket failed");
+ return libcret;
+ }
+
+ strncpy(proxy.sun_path, UDS_BUILD_CONN_ADDR, sizeof(proxy.sun_path));
+ if ((*libcconnect)(sock_fd, (struct sockaddr *)&proxy, sizeof(struct sockaddr_un)) < 0) {
+ uds_err("can't connect to uds proxy: %s", UDS_BUILD_CONN_ADDR);
+ goto err_end;
+ }
+ // 这里type需要是第一个入参fd的type
+ remoteconn.type = uds_conn_get_sock_type(fd);
+ if (remoteconn.type == (unsigned short)-1) {
+ remoteconn.type = SOCK_STREAM;
+ }
+ memset(remoteconn.sun_path, 0, sizeof(remoteconn.sun_path));
+ strncpy(remoteconn.sun_path, addr->sun_path, sizeof(remoteconn.sun_path));
+ ret = send(sock_fd, &remoteconn, sizeof(remoteconn), 0);
+ if (ret <= 0) {
+ uds_err("send remote connect request failed, ret:%d err:%s", ret, strerror(errno));
+ goto err_end;
+ }
+ ret = recv(sock_fd, &remotersp, sizeof(remotersp), MSG_WAITALL);
+ if (ret <= 0) {
+ uds_err("recv remote connect replay failed, ret:%d err:%s", ret, strerror(errno));
+ goto err_end;
+ }
+ if (remotersp.ret == 0) {
+ goto err_end;
+ }
+ } while(0);
+
+ close(sock_fd);
+ return (*libcconnect)(fd, addrarg, len);
+
+err_end:
+ close(sock_fd);
+ return libcret;
+}
diff --git a/qtfs/ipc/uds_event.c b/qtfs/ipc/uds_event.c
new file mode 100644
index 0000000..ff4d79b
--- /dev/null
+++ b/qtfs/ipc/uds_event.c
@@ -0,0 +1,999 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <unistd.h>
+#include <signal.h>
+#include <errno.h>
+#include <time.h>
+#include <fcntl.h>
+#include <sys/epoll.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include "dirent.h"
+
+#include "uds_main.h"
+#include "uds_event.h"
+
+int uds_event_build_step2(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_remote_build(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_build_step3(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_uds2tcp(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_tcp2uds(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_build_step4(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_tcp2pipe(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_pipe2tcp(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+
+int uds_event_module_init(struct uds_event_global_var *p)
+{
+ p->msg_controllen = UDS_EVENT_BUFLEN;
+ p->iov_len = UDS_EVENT_BUFLEN;
+ p->buflen = UDS_EVENT_BUFLEN;
+ p->msg_controlsendlen = UDS_EVENT_BUFLEN;
+ p->iov_sendlen = UDS_EVENT_BUFLEN;
+
+ p->msg_control = (char *)malloc(p->msg_controllen);
+ if (p->msg_control == NULL) {
+ uds_err("malloc msg control buf failed.");
+ p->msg_controllen = 0;
+ return EVENT_ERR;
+ }
+ p->msg_control_send = (char *)malloc(p->msg_controlsendlen);
+ if (p->msg_control_send == NULL) {
+ goto free1;
+ }
+ p->iov_base = (char *)malloc(p->iov_len);
+ if (p->iov_base == NULL) {
+ uds_err("malloc iov base failed.");
+ goto free2;
+ }
+ p->iov_base_send = (char *)malloc(p->iov_sendlen);
+ if (p->iov_base_send == NULL) {
+ goto free3;
+ }
+ p->buf = (char *)malloc(p->buflen);
+ if (p->buf == NULL) {
+ uds_err("malloc buf failed.");
+ goto free4;
+ }
+ return EVENT_OK;
+
+free4:
+ free(p->iov_base_send);
+ p->iov_base_send = NULL;
+
+free3:
+ free(p->iov_base);
+ p->iov_base = NULL;
+
+free2:
+ free(p->msg_control_send);
+ p->msg_control_send = NULL;
+
+free1:
+ free(p->msg_control);
+ p->msg_control = NULL;
+ return EVENT_ERR;
+}
+
+void uds_event_module_fini(struct uds_event_global_var *p)
+{
+ if (p->msg_control != NULL) {
+ free(p->msg_control);
+ p->msg_control = NULL;
+ p->msg_controllen = 0;
+ }
+ if (p->msg_control_send != NULL) {
+ free(p->msg_control_send);
+ p->msg_control_send = NULL;
+ p->msg_controlsendlen = 0;
+ }
+ if (p->iov_base != NULL) {
+ free(p->iov_base);
+ p->iov_base = NULL;
+ p->iov_len = 0;
+ }
+ if (p->iov_base_send != NULL) {
+ free(p->iov_base_send);
+ p->iov_base_send = NULL;
+ p->iov_sendlen = 0;
+ }
+ if (p->buf != NULL) {
+ free(p->buf);
+ p->buf = NULL;
+ p->buflen = 0;
+ }
+ return;
+}
+
+int uds_event_pre_hook(struct uds_event_global_var *p_event_var)
+{
+ p_event_var->cur = 0;
+ memset(p_event_var->tofree, 0, sizeof(struct uds_event *) * UDS_EPOLL_MAX_EVENTS);
+ return 0;
+}
+
+int uds_event_post_hook(struct uds_event_global_var *p_event_var)
+{
+ for (int i = 0; i < p_event_var->cur; i++) {
+ uds_log("event:%lx fd:%d free by its peer", p_event_var->tofree[i], p_event_var->tofree[i]->fd);
+ uds_del_event(p_event_var->tofree[i]);
+ }
+ return 0;
+}
+
+int uds_event_add_to_free(struct uds_event_global_var *p_event_var, struct uds_event *evt)
+{
+ if (evt->pipe == 1) {
+ uds_log("pipe event:%d no need to free peer", evt->fd);
+ return 0;
+ }
+
+ struct uds_event *peerevt = evt->peer;
+ if (peerevt == NULL) {
+ uds_err("peer event add to free is NULL, my fd:%d", evt->fd);
+ return -1;
+ }
+ peerevt->tofree = 1;
+ uds_log("event fd:%d addr:%lx add to free", peerevt->fd, peerevt);
+ p_event_var->tofree[p_event_var->cur] = peerevt;
+ p_event_var->cur++;
+ return 0;
+}
+
+int uds_event_pre_handler(struct uds_event *evt)
+{
+ if (evt->tofree == 1) {
+ uds_log("event fd:%d marked by peer as pending deletion", evt->fd);
+ return EVENT_ERR;
+ }
+ return EVENT_OK;
+}
+
+/*
+ * 1. accept local uds connect request
+ * 2. set new connection's event to build link step2
+ * 3. add new connection event to epoll list
+ */
+int uds_event_uds_listener(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ int connfd;
+ struct uds_event *evt = (struct uds_event *)arg;
+ if (evt == NULL) {
+ uds_err("param is invalid.");
+ return EVENT_ERR;
+ }
+ connfd = uds_sock_step_accept(evt->fd, AF_UNIX);
+ if (connfd <= 0) {
+ uds_err("conn fd error:%d", connfd);
+ return EVENT_ERR;
+ }
+
+ uds_log("accept an new connection, fd:%d", connfd);
+
+ uds_add_event(connfd, NULL, uds_event_build_step2, NULL);
+ return EVENT_OK;
+}
+
+int uds_event_build_step2(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ struct uds_event *evt = (struct uds_event *)arg;
+ if (evt == NULL) {
+ uds_err("param is invalid.");
+ return EVENT_ERR;
+ }
+ char buf[sizeof(struct uds_tcp2tcp) + sizeof(struct uds_proxy_remote_conn_req)] = {0};
+ struct uds_tcp2tcp *bdmsg = (struct uds_tcp2tcp *)buf;
+ struct uds_proxy_remote_conn_req *msg = (struct uds_proxy_remote_conn_req *)bdmsg->data;
+ int len;
+ memset(buf, 0, sizeof(buf));
+ len = recv(evt->fd, msg, sizeof(struct uds_proxy_remote_conn_req), MSG_WAITALL);
+ if (len == 0) {
+ uds_err("recv err msg:%d errno:%s", len, strerror(errno));
+ return EVENT_DEL;
+ }
+ if (len < 0) {
+ uds_err("read msg error:%d errno:%s", len, strerror(errno));
+ goto end;
+ }
+ if (msg->type != SOCK_STREAM && msg->type != SOCK_DGRAM) {
+ uds_err("uds type:%d invalid", msg->type);
+ return EVENT_ERR;
+ }
+
+ struct uds_conn_arg tcp = {
+ .cs = UDS_SOCKET_CLIENT,
+ };
+ int ret;
+ if ((ret = uds_build_tcp_connection(&tcp)) < 0) {
+ uds_err("step2 build tcp connection failed, return:%d", ret);
+ goto end;
+ }
+ bdmsg->msgtype = MSGCNTL_UDS;
+ bdmsg->msglen = sizeof(struct uds_proxy_remote_conn_req);
+ if (write(tcp.connfd, bdmsg, sizeof(struct uds_tcp2tcp) + sizeof(struct uds_proxy_remote_conn_req)) < 0) {
+ uds_err("send msg to tcp failed");
+ goto end;
+ }
+
+ struct uds_proxy_remote_conn_req *priv = (void *)malloc(sizeof(struct uds_proxy_remote_conn_req));
+ if (priv == NULL) {
+ uds_err("malloc failed");
+ goto end;
+ }
+
+ uds_log("step2 recv sun path:%s, add step3 event fd:%d", msg->sun_path, tcp.connfd);
+ memcpy(priv, msg, sizeof(struct uds_proxy_remote_conn_req));
+ uds_add_event(tcp.connfd, evt, uds_event_build_step3, priv);
+
+end:
+ return EVENT_OK;
+}
+
+
+int uds_event_build_step3(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ struct uds_event *evt = (struct uds_event *)arg;
+ struct uds_proxy_remote_conn_rsp msg;
+ int len;
+ memset(&msg, 0, sizeof(struct uds_proxy_remote_conn_rsp));
+ len = read(evt->fd, &msg, sizeof(struct uds_proxy_remote_conn_rsp));
+ if (len <= 0) {
+ uds_err("read error len:%d", len);
+ if (len == 0)
+ goto event_del;
+ return EVENT_ERR;
+ }
+ if (msg.ret == EVENT_ERR) {
+ uds_log("get build ack:%d, failed", msg.ret);
+ goto event_del;
+ }
+
+ struct uds_proxy_remote_conn_req *udsmsg = (struct uds_proxy_remote_conn_req *)evt->priv;
+ struct uds_conn_arg uds;
+
+ memset(&uds, 0, sizeof(struct uds_conn_arg));
+ uds.cs = UDS_SOCKET_SERVER;
+ uds.udstype = udsmsg->type;
+ strncpy(uds.sun_path, udsmsg->sun_path, sizeof(uds.sun_path));
+ if (uds_build_unix_connection(&uds) < 0) {
+ uds_err("failed to build uds server sunpath:%s", uds.sun_path);
+ goto event_del;
+ }
+ uds_log("remote conn build success, build uds server type:%d sunpath:%s fd:%d OK this event suspend,",
+ udsmsg->type, udsmsg->sun_path, uds.sockfd);
+ uds_event_suspend(epfd, evt);
+ uds_add_event(uds.sockfd, evt, uds_event_build_step4, NULL);
+
+ msg.ret = 1;
+ write(evt->peer->fd, &msg, sizeof(struct uds_proxy_remote_conn_rsp));
+ return EVENT_OK;
+
+event_del:
+ msg.ret = 0;
+ write(evt->peer->fd, &msg, sizeof(struct uds_proxy_remote_conn_rsp));
+ free(evt->priv);
+ return EVENT_DEL;
+}
+
+int uds_event_build_step4(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ struct uds_event *evt = (struct uds_event *)arg;
+ int connfd = uds_sock_step_accept(evt->fd, AF_UNIX);
+ if (connfd < 0) {
+ uds_err("accept connection failed fd:%d", connfd);
+ return EVENT_ERR;
+ }
+ struct uds_event *peerevt = (struct uds_event *)evt->peer;
+ peerevt->handler = uds_event_tcp2uds;
+ peerevt->peer = uds_add_event(connfd, peerevt, uds_event_uds2tcp, NULL);
+
+ uds_log("accept new connection fd:%d, peerfd:%d frontfd:%d peerfd:%d, peerevt(fd:%d) active now",
+ connfd, evt->peer->fd, peerevt->fd, peerevt->peer->fd, peerevt->fd);
+ uds_event_insert(epfd, peerevt);
+ return EVENT_DEL;
+}
+
+int uds_event_tcp_listener(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ struct uds_event *evt = (struct uds_event *)arg;
+ int connfd = uds_sock_step_accept(evt->fd, AF_INET);
+ if (connfd <= 0) {
+ uds_err("tcp conn fd error:%d", connfd);
+ return EVENT_ERR;
+ }
+ uds_log("tcp listener event enter, new connection fd:%d.", connfd);
+
+ uds_add_event(connfd, NULL, uds_event_remote_build, NULL);
+ return 0;
+}
+
+int uds_build_connect2uds(struct uds_event *evt, struct uds_proxy_remote_conn_req *msg)
+{
+ struct uds_conn_arg targ;
+ int len = recv(evt->fd, msg, sizeof(struct uds_proxy_remote_conn_req), MSG_WAITALL);
+ if (len <= 0) {
+ uds_err("recv failed, len:%d str:%s", len, strerror(errno));
+ return EVENT_ERR;
+ }
+
+ targ.cs = UDS_SOCKET_CLIENT;
+ targ.udstype = msg->type;
+ memset(targ.sun_path, 0, sizeof(targ.sun_path));
+ strncpy(targ.sun_path, msg->sun_path, sizeof(targ.sun_path));
+ if (uds_build_unix_connection(&targ) < 0) {
+ struct uds_proxy_remote_conn_rsp ack;
+ uds_err("can't connect to sun_path:%s", targ.sun_path);
+ ack.ret = EVENT_ERR;
+ write(evt->fd, &ack, sizeof(struct uds_proxy_remote_conn_rsp));
+ return EVENT_DEL;
+ }
+
+ evt->peer = uds_add_event(targ.connfd, evt, uds_event_uds2tcp, NULL);
+ evt->handler = uds_event_tcp2uds;
+
+ uds_log("build link req from tcp, sunpath:%s, type:%d, eventfd:%d peerfd:%d",
+ msg->sun_path, msg->type, targ.connfd, evt->fd);
+
+ struct uds_proxy_remote_conn_rsp ack;
+ ack.ret = EVENT_OK;
+
+ int ret = write(evt->fd, &ack, sizeof(struct uds_proxy_remote_conn_rsp));
+ if (ret <= 0) {
+ uds_err("apply ack failed, ret:%d", ret);
+ return EVENT_DEL;
+ }
+ return EVENT_OK;
+}
+
+int uds_build_pipe_proxy(struct uds_event *evt, struct uds_stru_scm_pipe *msg)
+{
+ int len = recv(evt->fd, msg, sizeof(struct uds_stru_scm_pipe), MSG_WAITALL);
+ if (len <= 0) {
+ uds_err("recv failed, len:%d str:%s", len, strerror(errno));
+ return EVENT_ERR;
+ }
+ if (msg->dir != SCM_PIPE_READ && msg->dir != SCM_PIPE_WRITE) {
+ uds_err("invalid pipe dir:%d", msg->dir);
+ return EVENT_ERR;
+ }
+ uds_log("pipe proxy event fd:%d pipe fd:%d dir:%d", evt->fd, msg->srcfd, msg->dir);
+
+ if (msg->dir == SCM_PIPE_READ) {
+ evt->pipe = 1;
+ evt->peerfd = evt->fd;
+ evt->fd = msg->srcfd;
+ evt->handler = uds_event_pipe2tcp;
+ } else {
+ evt->pipe = 1;
+ evt->peerfd = msg->srcfd;
+ evt->handler = uds_event_tcp2pipe;
+ }
+ return EVENT_OK;
+}
+
+int uds_event_remote_build(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ struct uds_event *evt = (struct uds_event *)arg;
+ struct uds_tcp2tcp *bdmsg = (struct uds_tcp2tcp *)p_event_var->iov_base;
+ struct uds_proxy_remote_conn_req *msg = (struct uds_proxy_remote_conn_req *)bdmsg->data;
+ int len;
+ int ret = EVENT_OK;
+ memset(p_event_var->iov_base, 0, p_event_var->iov_len);
+ len = recv(evt->fd, bdmsg, sizeof(struct uds_tcp2tcp), MSG_WAITALL);
+ if (len <= 0) {
+ uds_err("read no msg from sock:%d, len:%d", evt->fd, len);
+ return EVENT_ERR;
+ }
+
+ switch (bdmsg->msgtype) {
+ case MSGCNTL_UDS:
+ ret = uds_build_connect2uds(evt, msg);
+ break;
+ case MSGCNTL_PIPE:
+ ret = uds_build_pipe_proxy(evt, (struct uds_stru_scm_pipe *)bdmsg->data);
+ break;
+ default:
+ uds_err("remote build not support msgtype %d now", bdmsg->msgtype);
+ break;
+ }
+ return ret;
+}
+
+static inline mode_t uds_msg_file_mode(int fd)
+{
+ struct stat st;
+ char path[32] = {0};
+ if (fstat(fd, &st) != 0) {
+ uds_err("get fd:%d fstat failed, errstr:%s", fd, strerror(errno));
+ }
+ if (S_ISFIFO(st.st_mode)) {
+ uds_log("fd:%d is fifo", fd);
+ }
+
+ return st.st_mode;
+}
+
+static int uds_msg_scm_regular_file(int scmfd, int tcpfd, struct uds_event_global_var *p_event_var)
+{
+ int ret;
+ struct uds_tcp2tcp *p_msg = (struct uds_tcp2tcp *)p_event_var->buf;
+ struct uds_msg_scmrights *p_scmr = (struct uds_msg_scmrights *)&p_msg->data;
+ char *fdproc = calloc(1, UDS_PATH_MAX);
+ if (fdproc == NULL) {
+ uds_err("failed to calloc memory:%lx %lx", fdproc);
+ return EVENT_ERR;
+ }
+ sprintf(fdproc, "/proc/self/fd/%d", scmfd);
+ ret = readlink(fdproc, p_scmr->path, UDS_PATH_MAX);
+ if (ret < 0) {
+ uds_err("readlink:%s error, ret:%d, errstr:%s", fdproc, ret, strerror(errno));
+ free(fdproc);
+ close(scmfd);
+ return EVENT_ERR;
+ }
+ free(fdproc);
+ p_scmr->flags = fcntl(scmfd, F_GETFL, 0);
+ if (p_scmr->flags < 0) {
+ uds_err("fcntl get flags failed:%d error:%s", p_scmr->flags, strerror(errno));
+ close(scmfd);
+ return EVENT_ERR;
+ }
+ close(scmfd);
+ p_msg->msgtype = MSG_SCM_RIGHTS;
+ ret = write(tcpfd, p_msg, sizeof(struct uds_tcp2tcp) + sizeof(struct uds_msg_scmrights));
+ if (ret <= 0) {
+ uds_err("send scm rights msg to tcp failed, ret:%d", ret);
+ return EVENT_ERR;
+ }
+ uds_log("scm rights msg send to tcp, fd:%d path:%s flags:%d", scmfd, p_scmr->path, p_scmr->flags);
+ return EVENT_OK;
+}
+
+static int uds_msg_scm_fifo_file(int scmfd, int tcpfd, struct uds_event_global_var *p_event_var)
+{
+#define FDPATH_LEN 32
+ int ret;
+ struct uds_tcp2tcp *p_get = (struct uds_tcp2tcp *)p_event_var->buf;
+ struct uds_stru_scm_pipe *p_pipe = (struct uds_stru_scm_pipe *)p_get->data;
+ char path[FDPATH_LEN] = {0};
+ struct stat st;
+ p_get->msgtype = MSG_SCM_PIPE;
+ p_get->msglen = sizeof(struct uds_stru_scm_pipe);
+
+ sprintf(path, "/proc/self/fd/%d", scmfd);
+ lstat(path, &st);
+ if (st.st_mode & S_IRUSR) {
+ p_pipe->dir = SCM_PIPE_READ;
+ uds_log("scm rights recv read pipe fd:%d, mode:%o", scmfd, st.st_mode);
+ } else if (st.st_mode & S_IWUSR) {
+ p_pipe->dir = SCM_PIPE_WRITE;
+ uds_log("scm rights recv write pipe fd:%d, mode:%o", scmfd, st.st_mode);
+ } else {
+ uds_err("scm rights recv invalid pipe, mode:%o fd:%d", st.st_mode, scmfd);
+ return EVENT_ERR;
+ }
+ p_pipe->srcfd = scmfd;
+ ret = send(tcpfd, p_get, sizeof(struct uds_tcp2tcp) + sizeof(struct uds_stru_scm_pipe), 0);
+ if (ret <= 0) {
+ uds_err("send tar get msg failed, ret:%d errstr:%s", ret, strerror(errno));
+ return EVENT_ERR;
+ }
+ return EVENT_OK;
+}
+
+static int uds_msg_scmrights2tcp(struct cmsghdr *cmsg, int tcpfd, struct uds_event_global_var *p_event_var)
+{
+ int scmfd;
+ mode_t mode;
+
+ memset(p_event_var->buf, 0, p_event_var->buflen);
+ memcpy(&scmfd, CMSG_DATA(cmsg), sizeof(scmfd));
+ if (scmfd <= 0) {
+ uds_err("recv invalid scm fd:%d", scmfd);
+ return EVENT_ERR;
+ }
+
+ mode = uds_msg_file_mode(scmfd);
+
+ switch (mode & S_IFMT) {
+ case S_IFREG:
+ uds_log("recv scmfd:%d from uds, is regular file", scmfd);
+ uds_msg_scm_regular_file(scmfd, tcpfd, p_event_var);
+ break;
+ case S_IFIFO:
+ uds_log("recv scmfd:%d from uds, is fifo", scmfd);
+ uds_msg_scm_fifo_file(scmfd, tcpfd, p_event_var);
+ break;
+ default:
+ uds_err("scm rights not support file mode:%o", mode);
+ break;
+ }
+
+ return EVENT_OK;
+}
+
+static int uds_msg_cmsg2tcp(struct msghdr *msg, struct uds_event *evt, struct uds_event_global_var *p_event_var)
+{
+ int cnt = 0;
+ struct cmsghdr *cmsg = CMSG_FIRSTHDR(msg);
+ while (cmsg != NULL) {
+ cnt ++;
+ uds_log("cmsg type:%d len:%d level:%d, tcpfd:%d", cmsg->cmsg_type,
+ cmsg->cmsg_len, cmsg->cmsg_level, evt->peer->fd);
+ switch (cmsg->cmsg_type) {
+ case SCM_RIGHTS:
+ uds_msg_scmrights2tcp(cmsg, evt->peer->fd, p_event_var);
+ break;
+ default:
+ uds_err("cmsg type:%d not support now", cmsg->cmsg_type);
+ break;
+ }
+ cmsg = CMSG_NXTHDR(msg, cmsg);
+ }
+ return cnt;
+}
+
+static int uds_msg_scmfd_combine_msg(struct msghdr *msg, struct cmsghdr **cmsg, int *controllen, int fd)
+{
+ struct cmsghdr *cnxt = NULL;
+ if (*cmsg == NULL) {
+ cnxt = CMSG_FIRSTHDR(msg);
+ } else {
+ cnxt = CMSG_NXTHDR(msg, *cmsg);
+ }
+ *cmsg = cnxt;
+ cnxt->cmsg_level = SOL_SOCKET;
+ cnxt->cmsg_type = SCM_RIGHTS;
+ cnxt->cmsg_len = CMSG_LEN(sizeof(fd));
+ memcpy(CMSG_DATA(cnxt), &fd, sizeof(fd));
+ *controllen = *controllen + cnxt->cmsg_len;
+ return EVENT_OK;
+}
+
+static int uds_msg_scmright_send_fd(int sock, int fd)
+{
+ char byte = 0;
+ struct iovec iov;
+ struct msghdr msg;
+ struct cmsghdr *cmsg;
+ char buf[CMSG_SPACE(sizeof(fd))];
+
+ // send at least one char
+ memset(&msg, 0, sizeof(msg));
+ iov.iov_base = &byte;
+ iov.iov_len = 1;
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+
+
+ msg.msg_control = buf;
+ msg.msg_controllen = sizeof(buf);
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(fd));
+ // Initialize the payload
+ memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
+ msg.msg_controllen = cmsg->cmsg_len;
+
+ if (sendmsg(sock, &msg, 0) != iov.iov_len)
+ return -1;
+ return 0;
+}
+
+static int uds_msg_cmsg2uds(struct uds_tcp2tcp *msg, struct uds_event *evt)
+{
+ int scmfd = -1;
+ switch (msg->msgtype) {
+ case MSG_SCM_RIGHTS: {
+ struct uds_msg_scmrights *p_scmr = (struct uds_msg_scmrights *)&msg->data;
+ int ret;
+ int scmfd = open(p_scmr->path, p_scmr->flags);
+ if (scmfd < 0) {
+ uds_err("scm rights send fd failed, scmfd:%d path:%s flags:%d", scmfd, p_scmr->path, p_scmr->flags);
+ return -1;
+ }
+ uds_log("scm send fd:%d path:%s flags:%d", scmfd, p_scmr->path, p_scmr->flags);
+ break;
+ }
+ default:
+ uds_err("msg type:%d not support.", msg->msgtype);
+ return -1;
+ }
+ return scmfd;
+}
+
+int uds_msg_tcp2uds_scm_pipe(struct uds_tcp2tcp *p_msg, struct uds_event *evt)
+{
+ int scmfd;
+ int fd[SCM_PIPE_NUM];
+ struct uds_stru_scm_pipe *p_pipe = (struct uds_stru_scm_pipe *)p_msg->data;
+ int len = recv(evt->fd, p_pipe, p_msg->msglen, MSG_WAITALL);
+ if (len <= 0) {
+ uds_err("recv data failed, len:%d", len);
+ return EVENT_DEL;
+ }
+ if (p_pipe->dir != SCM_PIPE_READ && p_pipe->dir != SCM_PIPE_WRITE) {
+ uds_err("scm pipe recv invalid pipe dir:%d, srcfd:%d", p_pipe->dir, p_pipe->srcfd);
+ return EVENT_ERR;
+ }
+ struct uds_conn_arg tcp = {
+ .cs = UDS_SOCKET_CLIENT,
+ };
+ int ret;
+ if ((ret = uds_build_tcp_connection(&tcp)) < 0) {
+ uds_err("build tcp connection failed, return:%d", ret);
+ return EVENT_ERR;
+ }
+ if (pipe(fd) == -1) {
+ uds_err("pipe syscall error, strerr:%s", strerror(errno));
+ return EVENT_ERR;
+ }
+ if (p_pipe->dir == SCM_PIPE_READ) {
+ uds_log("send read pipe:%d to peer:%d", fd[SCM_PIPE_READ], evt->peer->fd);
+ scmfd = fd[SCM_PIPE_READ];
+ // read方向,proxy读取消息并转发,此代码处是远端,所以监听tcp换发给pipe write
+ uds_add_pipe_event(tcp.connfd, fd[SCM_PIPE_WRITE], uds_event_tcp2pipe, NULL);
+ } else {
+ uds_log("send write pipe:%d to peer:%d", fd[SCM_PIPE_WRITE], evt->peer->fd);
+ scmfd = fd[SCM_PIPE_WRITE];
+ // write方向,proxy读取远端代理pipe消息并转发,此处是远端,所以监听pipe read并转发给tcp
+ uds_add_pipe_event(fd[SCM_PIPE_READ], tcp.connfd, uds_event_pipe2tcp, NULL);
+ }
+
+ p_msg->msgtype = MSGCNTL_PIPE;
+ p_msg->msglen = sizeof(struct uds_stru_scm_pipe);
+ len = write(tcp.connfd, p_msg, sizeof(struct uds_tcp2tcp) + sizeof(struct uds_stru_scm_pipe));
+ if (len <= 0) {
+ uds_err("send pipe msg failed, len:%d", len);
+ return EVENT_ERR;
+ }
+ uds_log("success to build pipe fd map, dir:%d srcfd:%d tcpfd:%d readfd:%d writefd:%d",
+ p_pipe->dir, p_pipe->srcfd, tcp.connfd, fd[SCM_PIPE_READ], fd[SCM_PIPE_WRITE]);
+
+ return scmfd;
+}
+
+int uds_event_tcp2pipe(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ struct uds_event *evt = (struct uds_event *)arg;
+ memset(p_event_var->iov_base, 0, p_event_var->iov_len);
+ int len = read(evt->fd, p_event_var->iov_base, p_event_var->iov_len);
+ if (len <= 0) {
+ uds_err("read from tcp failed, len:%d str:%s", len, strerror(errno));
+ return EVENT_DEL;
+ }
+
+ uds_log("tcp:%d to pipe:%d len:%d, buf:\n>>>>>>>\n%.*s\n<<<<<<<\n", evt->fd, evt->peerfd, len, len, p_event_var->iov_base);
+ int ret = write(evt->peerfd, p_event_var->iov_base, len);
+ if (ret <= 0) {
+ uds_err("write to pipe failed, fd:%d str:%s", evt->peerfd, strerror(errno));
+ return EVENT_DEL;
+ }
+ return EVENT_OK;
+}
+
+int uds_event_pipe2tcp(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ struct uds_event *evt = (struct uds_event *)arg;
+ memset(p_event_var->iov_base, 0, p_event_var->iov_len);
+ int len = read(evt->fd, p_event_var->iov_base, p_event_var->iov_len);
+ if (len <= 0) {
+ uds_err("read from pipe failed, len:%d str:%s", len, strerror(errno));
+ return EVENT_DEL;
+ }
+
+ uds_log("pipe:%d to tcp:%d len:%d, buf:\n>>>>>>>\n%.*s\n<<<<<<<\n", evt->fd, evt->peerfd, len, len, p_event_var->iov_base);
+ int ret = write(evt->peerfd, p_event_var->iov_base, len);
+ if (ret <= 0) {
+ uds_err("write to tcp failed, fd:%d str:%s", evt->peerfd, strerror(errno));
+ return EVENT_DEL;
+ }
+ return EVENT_OK;
+
+}
+
+int uds_msg_tcp_end_msg(int sock)
+{
+ struct uds_tcp2tcp end = {.msgtype = MSG_END, .msglen = 0,};
+ int ret = write(sock, &end, sizeof(struct uds_tcp2tcp));
+ if (ret <= 0) {
+ uds_err("write end msg failed, ret:%d fd:%d", ret, sock);
+ return EVENT_DEL;
+ }
+ return EVENT_OK;
+}
+
+void uds_msg_init_event_buf(struct uds_event_global_var *p)
+{
+ memset(p->iov_base, 0, p->iov_len);
+ memset(p->iov_base_send, 0, p->iov_sendlen);
+ memset(p->msg_control, 0, p->msg_controllen);
+ memset(p->msg_control_send, 0, p->msg_controlsendlen);
+ memset(p->buf, 0, p->buflen);
+ return;
+}
+
+#define TEST_BUFLEN 256
+int uds_event_uds2tcp(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ struct uds_event *evt = (struct uds_event *)arg;
+ struct iovec iov;
+ struct msghdr msg;
+ struct cmsghdr *cmsg;
+ int cmsgcnt = 0;
+ int len;
+
+ memset(&msg, 0, sizeof(msg));
+ iov.iov_base = p_event_var->iov_base + sizeof(struct uds_tcp2tcp);
+ iov.iov_len = p_event_var->iov_len - sizeof(struct uds_tcp2tcp);
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+
+ msg.msg_control = p_event_var->msg_control;
+ msg.msg_controllen = p_event_var->msg_controllen;
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_len = p_event_var->msg_controllen;
+
+ len = recvmsg(evt->fd, &msg, 0);
+ if (len == 0) {
+ uds_err("recvmsg error, return:%d", len);
+ uds_event_add_to_free(p_event_var, evt);
+ return EVENT_DEL;
+ }
+ if (len < 0) {
+ uds_err("recvmsg error return val:%d", len);
+ return EVENT_ERR;
+ }
+ cmsg = CMSG_FIRSTHDR(&msg);
+ if (cmsg != NULL) {
+ uds_log("recvmsg cmsg len:%d cmsglen:%d iovlen:%d iov:%s cmsglevel:%d cmsgtype:%d",
+ len, cmsg->cmsg_len, iov.iov_len, iov.iov_base, cmsg->cmsg_level, cmsg->cmsg_type);
+ cmsgcnt = uds_msg_cmsg2tcp(&msg, evt, p_event_var);
+ if (len - cmsgcnt == 0)
+ goto endmsg;
+ }
+
+ struct uds_tcp2tcp *p_msg = (struct uds_tcp2tcp *)p_event_var->iov_base;
+ p_msg->msgtype = MSG_NORMAL;
+ p_msg->msglen = len;
+ int ret = write(evt->peer->fd, (void *)p_msg, p_msg->msglen + sizeof(struct uds_tcp2tcp));
+ if (ret <= 0) {
+ uds_err("write to peer:%d failed, retcode:%d len:%d", evt->peer->fd, ret, len);
+ return EVENT_ERR;
+ }
+
+ uds_log("write iov msg to tcp success, msgtype:%d ret:%d iovlen:%d recvlen:%d udsheadlen:%d msglen:%d msg:\n>>>>>>>\n%.*s\n<<<<<<<\n",
+ p_msg->msgtype, ret, iov.iov_len, len, sizeof(struct uds_tcp2tcp), p_msg->msglen, p_msg->msglen, p_msg->data);
+endmsg:
+ return uds_msg_tcp_end_msg(evt->peer->fd);
+}
+
+int uds_event_tcp2uds(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+#define MAX_FDS 64
+ int fds[MAX_FDS] = {0};
+ int fdnum = 0;
+ struct uds_event *evt = (struct uds_event *)arg;
+ struct uds_tcp2tcp *p_msg = (struct uds_tcp2tcp *)p_event_var->iov_base;
+ int ret;
+ int normal_msg_len = 0;
+ struct msghdr msg;
+ struct cmsghdr *cmsg = NULL;
+ struct iovec iov;
+ int msg_controllen = 0;
+
+ memset(&msg, 0, sizeof(msg));
+ iov.iov_base = p_event_var->iov_base_send;
+ iov.iov_len = 0;
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+ msg.msg_control = p_event_var->msg_control_send;
+ msg.msg_controllen = p_event_var->msg_controlsendlen;
+
+ while (1) {
+ int len = recv(evt->fd, p_msg, sizeof(struct uds_tcp2tcp), MSG_WAITALL);
+ if (len <= 0) {
+ uds_err("recv no msg maybe sock is closed, delete this tcp2uds event, len:%d.", len);
+ goto close_event;
+ }
+ uds_log("pmsg:%lx type:%d len:%d iov_base:%lx len:%d", p_msg, p_msg->msgtype, p_msg->msglen, p_event_var->iov_base, len);
+ if (p_msg->msgtype == MSG_END) {
+ break;
+ }
+ if (p_msg->msglen > p_event_var->iov_len - sizeof(struct uds_tcp2tcp) || p_msg->msglen <= 0) {
+ uds_err("pmsg len:%d is invalid, fd:%d peerfd:%d", p_msg->msglen, evt->fd, evt->peer->fd);
+ continue;
+ }
+ switch(p_msg->msgtype) {
+ case MSG_NORMAL:
+ if (normal_msg_len != 0) {
+ uds_err("normal msg repeat recv fd:%d", evt->fd);
+ goto err;
+ }
+ normal_msg_len = recv(evt->fd, p_event_var->iov_base_send, p_msg->msglen, MSG_WAITALL);
+ if (normal_msg_len <= 0) {
+ uds_err("recv msg error:%d fd:%d", len, evt->fd);
+ goto close_event;
+ }
+ iov.iov_len = normal_msg_len;
+ uds_log("recv normal msg len:%d str: \n>>>>>>>\n%.*s\n<<<<<<<", iov.iov_len, iov.iov_len, iov.iov_base);
+ break;
+ case MSG_SCM_RIGHTS: {
+ int len;
+ int scmfd;
+ struct uds_msg_scmrights *p_scm = (struct uds_msg_scmrights *) p_msg->data;
+ memset(p_scm->path, 0, sizeof(p_scm->path));
+ // SCM RIGHTS msg proc
+ len = recv(evt->fd, p_msg->data, p_msg->msglen, MSG_WAITALL);
+ if (len <= 0) {
+ uds_err("recv data failed len:%d", p_msg->msglen);
+ return EVENT_DEL;
+ }
+ scmfd = uds_msg_cmsg2uds(p_msg, evt);
+ if (scmfd == -1) {
+ goto err;
+ }
+ fds[fdnum++] = scmfd;
+ uds_msg_scmfd_combine_msg(&msg, &cmsg, &msg_controllen, scmfd);
+ break;
+ }
+ case MSG_SCM_PIPE: {
+ int scmfd;
+ scmfd = uds_msg_tcp2uds_scm_pipe(p_msg, evt);
+ if (scmfd == EVENT_DEL)
+ goto close_event;
+ if (scmfd < 0)
+ goto err;
+ fds[fdnum++] = scmfd;
+ uds_msg_scmfd_combine_msg(&msg, &cmsg, &msg_controllen, scmfd);
+ break;
+ }
+ default:
+ uds_err("recv unsupport msg type:%d event fd:%d", p_msg->msgtype, evt->fd);
+ break;
+ }
+ }
+ if (msg_controllen == 0 && iov.iov_len == 0)
+ goto err;
+ msg.msg_controllen = msg_controllen;
+ if (iov.iov_len == 0) iov.iov_len = 1;
+ ret = sendmsg(evt->peer->fd, &msg, 0);
+ uds_log("evt:%d sendmsg len:%d, controllen:%d errno:%s", evt->fd, ret, msg_controllen, strerror(errno));
+ for (int i = 0; i < fdnum; i++) {
+ close(fds[i]);
+ }
+ return EVENT_OK;
+err:
+ return EVENT_ERR;
+
+close_event:
+ uds_event_add_to_free(p_event_var, evt);
+ return EVENT_DEL;
+}
+
+int uds_diag_is_epoll_fd(int fd)
+{
+ for (int i = 0; i < p_uds_var->work_thread_num; i++) {
+ if (fd == p_uds_var->efd[i])
+ return 1;
+ }
+ return 0;
+}
+
+void uds_diag_list_fd(char *buf, int len)
+{
+#define FDPATH_LEN 32
+ int pos = 0;
+ char path[32] = {0};
+ DIR *dir = NULL;
+ struct dirent *entry;
+ dir = opendir("/proc/self/fd/");
+ if (dir == NULL) {
+ uds_err("open path:/proc/self/fd/ failed");
+ return;
+ }
+ while (entry = readdir(dir)) {
+ int fd = atoi(entry->d_name);
+ char fdpath[FDPATH_LEN];
+ char link[FDPATH_LEN];
+ int ret;
+ if (fd <= 2 || uds_diag_is_epoll_fd(fd))
+ continue;
+ memset(fdpath, 0, FDPATH_LEN);
+ memset(link, 0, FDPATH_LEN);
+ sprintf(fdpath, "/proc/self/fd/%d", fd);
+ ret = readlink(fdpath, link, FDPATH_LEN);
+ pos += sprintf(&buf[pos], "+ fd:%s type:%u link:%s\n", entry->d_name, entry->d_type, link);
+ }
+ closedir(dir);
+ return;
+}
+
+int uds_diag_string(char *buf, int len)
+{
+ int pos = 0;
+ memset(buf, 0, len);
+ pos = sprintf(buf, "+-----------------------------Unix Proxy Diagnostic information-------------------------+\n");
+ pos += sprintf(&buf[pos], "+ Thread nums:%d\n", p_uds_var->work_thread_num);
+ for (int i = 0; i < p_uds_var->work_thread_num; i++) {
+ pos += sprintf(&buf[pos], "+ Thread %d events count:%d\n", i+1, p_uds_var->work_thread[i].info.events);
+ }
+ pos += sprintf(&buf[pos], "+ Log level:%s\n", p_uds_var->logstr[p_uds_var->loglevel]);
+ strcat(buf, "+---------------------------------------------------------------------------------------+\n");
+ return strlen(buf);
+}
+
+// DIAG INFO
+int uds_event_diag_info(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ int connfd;
+ int len;
+ int ret;
+ struct uds_event *evt = (struct uds_event *)arg;
+ if (evt == NULL) {
+ uds_err("param is invalid.");
+ return EVENT_ERR;
+ }
+ connfd = uds_sock_step_accept(evt->fd, AF_UNIX);
+ if (connfd <= 0) {
+ uds_err("conn fd error:%d", connfd);
+ return EVENT_ERR;
+ }
+
+ uds_log("diag accept an new connection to send diag info, fd:%d", connfd);
+ len = uds_diag_string(p_event_var->iov_base, p_event_var->iov_len);
+ ret = send(connfd, p_event_var->iov_base, len, 0);
+ if (ret <= 0) {
+ uds_err("send diag info error, ret:%d len:%d", ret, len);
+ }
+ close(connfd);
+ return EVENT_OK;
+}
+
+#define UDS_LOG_STR(level) (level < 0 || level >= UDS_LOG_MAX) ? p_uds_var->logstr[UDS_LOG_MAX] : p_uds_var->logstr[level]
+int uds_event_debug_level(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ int connfd;
+ int len;
+ int ret;
+ int cur;
+ struct uds_event *evt = (struct uds_event *)arg;
+ if (evt == NULL) {
+ uds_err("param is invalid.");
+ return EVENT_ERR;
+ }
+ connfd = uds_sock_step_accept(evt->fd, AF_UNIX);
+ if (connfd <= 0) {
+ uds_err("conn fd error:%d", connfd);
+ return EVENT_ERR;
+ }
+
+ cur = p_uds_var->loglevel;
+ if (cur + 1 < UDS_LOG_MAX) {
+ p_uds_var->loglevel += 1;
+ } else {
+ p_uds_var->loglevel = UDS_LOG_NONE;
+ }
+
+ uds_log("debug level accept a new connection, current level:%s change to:%s", UDS_LOG_STR(cur), UDS_LOG_STR(p_uds_var->loglevel));
+
+ len = sprintf(p_event_var->iov_base, "+---------------UDS LOG LEVEL UPDATE--------------+\n"
+ "+ Log level is:%s before, now change to :%s.\n"
+ "+-------------------------------------------------+\n", UDS_LOG_STR(cur), UDS_LOG_STR(p_uds_var->loglevel));
+
+ ret = send(connfd, p_event_var->iov_base, len, 0);
+ if (ret <= 0) {
+ uds_err("send debug level info error, ret:%d len:%d", ret, len);
+ }
+ close(connfd);
+ return EVENT_OK;
+}
diff --git a/qtfs/ipc/uds_event.h b/qtfs/ipc/uds_event.h
new file mode 100644
index 0000000..a52bf67
--- /dev/null
+++ b/qtfs/ipc/uds_event.h
@@ -0,0 +1,64 @@
+#ifndef __QTFS_UDS_EVENT_H__
+#define __QTFS_UDS_EVENT_H__
+
+#define UDS_EVENT_BUFLEN 4096
+#define UDS_PATH_MAX 1024
+
+enum EVENT_RETCODE {
+ EVENT_OK = 0,
+ EVENT_ERR = -1,
+ EVENT_DEL = -2, // del this event after return
+};
+
+enum TCP2TCP_TYPE {
+ MSG_NORMAL = 0xa5a5, // 消息类型从特殊数字开始,防止误识别消息
+ MSG_SCM_RIGHTS,
+ MSG_SCM_CREDENTIALS, // unix domain 扩展消息,预留
+ MSG_SCM_SECURITY, // unix domain 扩展消息,预留
+ MSG_GET_TARGET, // 控制消息,用于获取对端的target fd
+ MSG_SCM_PIPE, // 使用SCM传递了一个pipe
+ MSG_END, // tcp消息的结束体
+};
+
+enum TCPCNTL_TYPE {
+ MSGCNTL_UDS = 1, // uds代理模式
+ MSGCNTL_PIPE, // pipe匿名管道代理模式
+};
+
+// 因为要区分SCM_RIGHTS和普通消息,TCP到TCP需要有一个协议头
+struct uds_tcp2tcp {
+ int msgtype;
+ int msglen; // len of data
+ char data[0];
+};
+
+struct uds_msg_scmrights {
+ int flags; // open flags
+ char path[UDS_PATH_MAX];
+};
+
+enum {
+ SCM_PIPE_READ = 0,
+ SCM_PIPE_WRITE,
+ SCM_PIPE_NUM,
+};
+
+struct uds_stru_scm_pipe {
+ int dir; // 0: send read filedes; 1: send write filedes
+ // proxy通过scm rights接收到员pipe fd,后面消息回来时事件
+ // 会发生变化,所以需要回消息时带上,才能建立关联
+ int srcfd;
+};
+
+int uds_event_uds_listener(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_tcp_listener(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_diag_info(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_debug_level(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_pre_handler(struct uds_event *evt);
+int uds_event_pre_hook(struct uds_event_global_var *p_event_var);
+int uds_event_post_hook(struct uds_event_global_var *p_event_var);
+int uds_event_module_init(struct uds_event_global_var *p_event_var);
+void uds_event_module_fini(struct uds_event_global_var *p);
+
+#endif
+
diff --git a/qtfs/ipc/uds_main.c b/qtfs/ipc/uds_main.c
new file mode 100644
index 0000000..b479a60
--- /dev/null
+++ b/qtfs/ipc/uds_main.c
@@ -0,0 +1,556 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <pthread.h>
+#include <string.h>
+#include <errno.h>
+#include <unistd.h>
+#include <signal.h>
+#include <sys/epoll.h>
+#include <netinet/ip.h>
+#include <netinet/in.h>
+#include <sys/un.h>
+#include <netinet/udp.h>
+#include <netinet/tcp.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <sys/ioctl.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <dirent.h>
+
+#include "../comm.h"
+#include "uds_main.h"
+#include "uds_event.h"
+
+struct uds_global_var g_uds_var = {.logstr = {"NONE", "ERROR", "INFO", "UNKNOWN"}};
+struct uds_global_var *p_uds_var = &g_uds_var;
+struct uds_event_global_var *g_event_var = NULL;
+
+struct uds_event *uds_alloc_event()
+{
+ struct uds_event *p = (struct uds_event *)malloc(sizeof(struct uds_event));
+ if (p == NULL) {
+ uds_err("malloc failed.");
+ return NULL;
+ }
+ memset(p, 0, sizeof(struct uds_event));
+ return p;
+}
+
+int uds_event_insert(int efd, struct uds_event *event)
+{
+ struct epoll_event evt;
+ evt.data.ptr = (void *)event;
+ evt.events = EPOLLIN;
+ if (-1 == epoll_ctl(efd, EPOLL_CTL_ADD, event->fd, &evt)) {
+ uds_err("epoll ctl add fd:%d event failed.", event->fd);
+ return -1;
+ }
+ return 0;
+}
+
+int uds_event_suspend(int efd, struct uds_event *event)
+{
+ int ret = epoll_ctl(efd, EPOLL_CTL_DEL, event->fd, NULL);
+ if (ret != 0) {
+ uds_err("failed to suspend fd:%d.", event->fd);
+ return -1;
+ }
+ return 0;
+}
+
+int uds_event_delete(int efd, int fd)
+{
+ int ret = epoll_ctl(efd, EPOLL_CTL_DEL, fd, NULL);
+ if (ret != 0) {
+ uds_err("failed to delete event fd:%d.", fd);
+ } else {
+ uds_log("event fd:%d deleted.", fd);
+ }
+ close(fd);
+ return ret;
+}
+
+void uds_main_loop(int efd, struct uds_thread_arg *arg)
+{
+ int n = 0;
+ int ret;
+ struct uds_event *udsevt;
+ struct epoll_event *evts = NULL;
+ struct uds_event_global_var *p_event_var = arg->p_event_var;
+ if (p_event_var == NULL) {
+ uds_err("event variable invalid.");
+ return;
+ }
+
+ evts = calloc(UDS_EPOLL_MAX_EVENTS, sizeof(struct epoll_event));
+ if (evts == NULL) {
+ uds_err("init calloc evts failed.");
+ return;
+ }
+ if (uds_event_module_init(p_event_var) == EVENT_ERR) {
+ uds_err("uds event module init failed, main loop not run.");
+ return;
+ }
+#ifdef QTFS_SERVER
+ extern int engine_run;
+ while (engine_run) {
+#else
+ while (1) {
+#endif
+ n = epoll_wait(efd, evts, UDS_EPOLL_MAX_EVENTS, 1000);
+ if (n == 0)
+ continue;
+ if (n < 0) {
+ uds_err("epoll wait return errcode:%d", n);
+ continue;
+ }
+ arg->info.events += n;
+ uds_event_pre_hook(p_event_var);
+ for (int i = 0; i < n; i++) {
+ udsevt = (struct uds_event *)evts[i].data.ptr;
+ uds_log("event fd:%d events:%d tofree:%d", udsevt->fd, evts[i].events, udsevt->tofree);
+ if (udsevt->handler == NULL) {
+ uds_err("bad event, fd:%d handler is NULL.", udsevt->fd);
+ continue;
+ }
+ // 预检查失败择不执行handler
+ if (uds_event_pre_handler(udsevt) == EVENT_ERR) {
+ continue;
+ }
+ ret = udsevt->handler(udsevt, efd, p_event_var);
+ // 此处释放当前事件,peer事件需要handler里面释放
+ if (ret == EVENT_DEL) {
+ uds_del_event(udsevt);
+ }
+ }
+ uds_event_post_hook(p_event_var);
+ }
+ uds_log("main loop exit.");
+ uds_event_module_fini(p_event_var);
+ return;
+}
+
+int uds_build_tcp_connection(struct uds_conn_arg *arg)
+{
+ const int sock_max_conn_num = 1024;
+
+ if (arg->cs > UDS_SOCKET_SERVER) {
+ uds_err("cs type %d is error.", arg->cs);
+ return -1;
+ }
+ struct sockaddr_in sock_addr = {
+ .sin_family = AF_INET,
+ };
+ int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
+
+ if (sock_fd < 0) {
+ uds_err("As %s failed, socket fd: %d, err:%s.",
+ (arg->cs == UDS_SOCKET_CLIENT) ? "client" : "server",
+ sock_fd, strerror(errno));
+ return -1;
+ }
+ arg->sockfd = sock_fd;
+
+ if (arg->cs == UDS_SOCKET_SERVER) {
+ sock_addr.sin_port = htons(p_uds_var->tcp.port);
+ sock_addr.sin_addr.s_addr = inet_addr(p_uds_var->tcp.addr);
+ if (bind(sock_fd, (struct sockaddr *)&sock_addr, sizeof(sock_addr)) < 0) {
+ uds_err("As server failed, bind error, err:%s.",
+ strerror(errno));
+ goto close_and_return;
+ }
+ if (listen(sock_fd, sock_max_conn_num) < 0) {
+ uds_err("As server listen failed, err:%s.", strerror(errno));
+ goto close_and_return;
+ }
+ } else {
+ sock_addr.sin_port = htons(p_uds_var->tcp.peerport);
+ sock_addr.sin_addr.s_addr = inet_addr(p_uds_var->tcp.peeraddr);
+ if (connect(arg->sockfd, (struct sockaddr *)&sock_addr, sizeof(struct sockaddr_in)) < 0) {
+ goto close_and_return;
+ }
+ arg->connfd = sock_fd;
+ uds_log("Connect to server successed, ip:%s port:%u", p_uds_var->tcp.peeraddr, p_uds_var->tcp.peerport);
+ }
+
+ return 0;
+close_and_return:
+ close(sock_fd);
+ return -1;
+}
+
+int uds_build_unix_connection(struct uds_conn_arg *arg)
+{
+ const int sock_max_conn_num = 5;
+ if (arg->cs > UDS_SOCKET_SERVER) {
+ uds_err("cs type %d is error.", arg->cs);
+ return -1;
+ }
+ struct sockaddr_un sock_addr = {
+ .sun_family = AF_UNIX,
+ };
+ int sock_fd = socket(AF_UNIX, arg->udstype, 0);
+
+ if (sock_fd < 0) {
+ uds_err("As %s failed, socket fd: %d, err:%s.",
+ (arg->cs == UDS_SOCKET_CLIENT) ? "client" : "server",
+ sock_fd, strerror(errno));
+ return -1;
+ }
+ strncpy(sock_addr.sun_path, arg->sun_path, sizeof(sock_addr.sun_path));
+ arg->sockfd = sock_fd;
+
+ if (arg->cs == UDS_SOCKET_SERVER) {
+ unlink(sock_addr.sun_path);
+ if (bind(sock_fd, (struct sockaddr *)&sock_addr, sizeof(sock_addr)) < 0) {
+ uds_err("As server failed, bind error, err:%s.",
+ strerror(errno));
+ goto close_and_return;
+ }
+ if (listen(sock_fd, sock_max_conn_num) < 0) {
+ uds_err("As server listen failed, err:%s.", strerror(errno));
+ goto close_and_return;
+ }
+ } else {
+ if (connect(arg->sockfd, (struct sockaddr *)&sock_addr, sizeof(struct sockaddr_un)) < 0) {
+ goto close_and_return;
+ }
+ arg->connfd = sock_fd;
+ uds_log("Connect to server successed, sun path:%s", arg->sun_path);
+ }
+
+ return 0;
+close_and_return:
+ uds_log("close sockfd:%d and return", sock_fd);
+ close(sock_fd);
+ return -1;
+
+}
+
+int uds_sock_step_accept(int sock_fd, int family)
+{
+ struct sockaddr_in in_addr;
+ struct sockaddr_un un_addr;
+ socklen_t len = (family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_un);
+ int connfd;
+ if (family == AF_INET) {
+ connfd = accept(sock_fd, (struct sockaddr *)&in_addr, &len);
+ } else {
+ connfd = accept(sock_fd, (struct sockaddr *)&un_addr, &len);
+ }
+ if (connfd < 0) {
+ uds_err("Accept error:%d, err:%s.", connfd, strerror(errno));
+ return connfd;
+ }
+ if (family == AF_INET) {
+ uds_log("Accept success, ip:%s, port:%u",
+ inet_ntoa(in_addr.sin_addr),
+ ntohs(in_addr.sin_port));
+ } else {
+ uds_log("Accept success, sun path:%s", un_addr.sun_path);
+ }
+ return connfd;
+}
+
+struct uds_event *uds_add_event(int fd, struct uds_event *peer, int (*handler)(void *, int, struct uds_event_global_var *), void *priv)
+{
+ struct uds_event *newevt = uds_alloc_event();
+ int hash = fd % p_uds_var->work_thread_num;
+ if (newevt == NULL || p_uds_var->efd[hash] <= 0) {
+ uds_err("alloc event failed, efd:%d hash:%d", p_uds_var->efd[hash], hash);
+ return NULL;
+ }
+
+ newevt->fd = fd;
+ newevt->peer = peer; // 如果tcp回应,消息转回uds这个fd
+ newevt->handler = handler;
+ newevt->priv = priv;
+ newevt->tofree = 0;
+ uds_event_insert(p_uds_var->efd[hash], newevt);
+ return newevt;
+}
+
+struct uds_event *uds_add_pipe_event(int fd, int peerfd, int (*handler)(void *, int, struct uds_event_global_var *), void *priv)
+{
+ int hash = fd % p_uds_var->work_thread_num;
+ struct uds_event *newevt = uds_alloc_event();
+ if (newevt == NULL || p_uds_var->efd[hash] <= 0) {
+ uds_err("alloc event failed, efd:%d", p_uds_var->efd[hash]);
+ return NULL;
+ }
+
+ newevt->fd = fd;
+ newevt->peerfd = peerfd; // 如果tcp回应,消息转回uds这个fd
+ newevt->handler = handler;
+ newevt->priv = priv;
+ newevt->tofree = 0;
+ newevt->pipe = 1;
+ uds_event_insert(p_uds_var->efd[hash], newevt);
+ return newevt;
+}
+
+void uds_del_event(struct uds_event *evt)
+{
+ int hash = evt->fd % p_uds_var->work_thread_num;
+ if (evt->pipe == 1 &&evt->peerfd != -1) {
+ // pipe是单向,peerfd没有epoll事件,所以直接关闭
+ close(evt->peerfd);
+ evt->peerfd = -1;
+ }
+ uds_event_delete(p_uds_var->efd[hash], evt->fd);
+ free(evt);
+ return;
+}
+
+void uds_thread_diag_init(struct uds_thread_info *info)
+{
+ info->events = 0;
+ info->fdnum = 0;
+}
+
+void *uds_proxy_thread(void *arg)
+{
+ struct uds_thread_arg *parg = (struct uds_thread_arg *)arg;
+ uds_thread_diag_init(&parg->info);
+ uds_main_loop(parg->efd, parg);
+ return NULL;
+}
+
+struct uds_event *uds_init_unix_listener(const char *addr, int (*handler)(void *, int, struct uds_event_global_var *))
+{
+ struct uds_event *udsevt;
+ struct uds_conn_arg arg;
+ struct uds_conn_arg *parg = &arg;
+
+ parg->cs = UDS_SOCKET_SERVER;
+ strncpy(parg->sun_path, addr, sizeof(parg->sun_path));
+ parg->udstype = SOCK_STREAM;
+ if (uds_build_unix_connection(parg) != 0)
+ return NULL;
+ udsevt = uds_add_event(parg->sockfd, NULL, handler, NULL);
+ if (udsevt == NULL) {
+ uds_err("add unix listener event failed.");
+ return NULL;
+ }
+ return udsevt;
+}
+
+struct uds_event *uds_init_tcp_listener()
+{
+ struct uds_event *tcpevt;
+ struct uds_conn_arg arg;
+ struct uds_conn_arg *parg = &arg;
+ parg->cs = UDS_SOCKET_SERVER;
+ if (uds_build_tcp_connection(parg) != 0)
+ return NULL;
+
+ tcpevt = uds_add_event(parg->sockfd, NULL, uds_event_tcp_listener, NULL);
+ if (tcpevt == NULL)
+ return NULL;
+ return tcpevt;
+}
+
+void uds_thread_create()
+{
+ struct uds_conn_arg arg;
+ struct uds_conn_arg *parg = &arg;
+ struct uds_event *udsevt;
+ struct uds_event *tcpevt;
+ struct uds_event *diagevt;
+ struct uds_event *logevt;
+ int efd;
+
+ for (int i = 0; i < p_uds_var->work_thread_num; i++) {
+ efd = epoll_create1(0);
+ if (efd == -1) {
+ uds_err("epoll create1 failed, i:%d.", i);
+ return;
+ }
+ p_uds_var->efd[i] = efd;
+ }
+
+ if ((udsevt = uds_init_unix_listener(UDS_BUILD_CONN_ADDR, uds_event_uds_listener)) == NULL)
+ return;
+
+ if ((tcpevt = uds_init_tcp_listener()) == NULL)
+ goto end;
+
+ if ((diagevt = uds_init_unix_listener(UDS_DIAG_ADDR, uds_event_diag_info)) == NULL)
+ goto end1;
+
+ if ((logevt = uds_init_unix_listener(UDS_LOGLEVEL_UPD, uds_event_debug_level)) == NULL)
+ goto end2;
+
+ do {
+ pthread_t *thrd = (pthread_t *)malloc(sizeof(pthread_t) * p_uds_var->work_thread_num);
+ struct uds_thread_arg *work_thread;
+ if (thrd == NULL) {
+ uds_err("thread info malloc failed.");
+ break;
+ }
+ work_thread = (struct uds_thread_arg *)malloc(sizeof(struct uds_thread_arg *) * p_uds_var->work_thread_num);
+ if (work_thread == NULL) {
+ uds_err("thread arg malloc failed.");
+ free(thrd);
+ break;
+ }
+
+ for (int i = 0; i < p_uds_var->work_thread_num; i++) {
+ p_uds_var->work_thread[i].p_event_var = &g_event_var[i];
+ p_uds_var->work_thread[i].efd = p_uds_var->efd[i];
+ (void)pthread_create(&thrd[i], NULL, uds_proxy_thread, &p_uds_var->work_thread[i]);
+ }
+ p_uds_var->loglevel = UDS_LOG_NONE;
+ for (int i = 0; i < p_uds_var->work_thread_num; i++)
+ pthread_join(thrd[i], NULL);
+ free(thrd);
+ free(work_thread);
+ } while(0);
+end2:
+ uds_del_event(diagevt);
+end1:
+ uds_del_event(tcpevt);
+end:
+ uds_del_event(udsevt);
+ for (int i = 0; i < p_uds_var->work_thread_num; i++)
+ close(p_uds_var->efd[i]);
+
+ return;
+}
+
+int uds_set_pid()
+{
+ int fd = -1;
+ if (access(QTFS_CLIENT_DEV, 0) == 0) {
+ fd = open(QTFS_CLIENT_DEV, O_RDONLY | O_NONBLOCK);
+ if (fd < 0)
+ goto open_failed;
+ goto set;
+ }
+ if (access(QTFS_SERVER_DEV, 0) == 0) {
+ fd = open(QTFS_SERVER_DEV, O_RDONLY | O_NONBLOCK);
+ if (fd < 0)
+ goto open_failed;
+ goto set;
+ }
+ uds_err("qtfs dev(<%s> or <%s>) both not exist", QTFS_CLIENT_DEV, QTFS_SERVER_DEV);
+ return EVENT_ERR;
+
+open_failed:
+ uds_err("open %s failed, ret:%d", QTFS_CLIENT_DEV, fd);
+ return EVENT_ERR;
+
+set:
+ do {
+ int pid = getpid();
+ int ret = ioctl(fd, QTFS_IOCTL_UDS_PROXY_PID, &pid);
+ if (ret < 0) {
+ uds_err("ioctl failed to set pid:%d ret:%d", pid, ret);
+ return EVENT_ERR;
+ }
+ uds_log("set proxy pid:%d to qtfs successed.", pid);
+ } while (0);
+ close(fd);
+ return EVENT_OK;
+}
+
+int uds_env_prepare()
+{
+ DIR *dir;
+ if (access(UDS_BUILD_CONN_ADDR, 0) == 0)
+ return EVENT_OK;
+
+ if ((dir = opendir(UDS_BUILD_CONN_DIR)) == NULL) {
+ if (mkdir(UDS_BUILD_CONN_DIR, 0755) < 0) {
+ uds_err("mkdir %s failed.", UDS_BUILD_CONN_DIR);
+ }
+ } else {
+ closedir(dir);
+ }
+ int fd = open(UDS_BUILD_CONN_ADDR, O_RDONLY|O_CREAT, 0700);
+ if (fd < 0) {
+ uds_err("create file:%s failed.", UDS_BUILD_CONN_ADDR);
+ return EVENT_ERR;
+ }
+ uds_log("success to create %s.", UDS_BUILD_CONN_ADDR);
+ close(fd);
+ return EVENT_OK;
+}
+
+static void uds_sig_pipe(int signum)
+{
+ uds_log("uds proxy recv sigpipe and ignore");
+}
+
+void uds_helpinfo(char *argv[])
+{
+ uds_err("Usage:");
+ uds_err(" %s <addr> <port> <peeraddr> <peerport>.", argv[0]);
+ uds_err("Param:");
+ uds_err(" <addr> - server ip address");
+ uds_err(" <port> - port number");
+ uds_err(" <peeraddr> - peer address");
+ uds_err(" <peerport> - peer port");
+ return;
+}
+
+/*
+ * uds跨主机协同主程序,设计成镜像的,每一端2个线程:send thread、recv thread
+ * 在server侧线程由原engine拉起,在client侧新起一个engine进程
+ */
+#ifdef QTFS_SERVER
+int uds_proxy_main(int argc, char *argv[])
+#else
+int main(int argc, char *argv[])
+#endif
+{
+ p_uds_var->loglevel = UDS_LOG_INFO;
+#define ARG_NUM 6
+ if (argc != ARG_NUM) {
+ uds_helpinfo(argv);
+ return -1;
+ }
+ if (uds_set_pid() != EVENT_OK) {
+ uds_err("proxy failed to set pid.");
+ return -1;
+ }
+ if (uds_env_prepare() != EVENT_OK) {
+ uds_err("proxy prepare environment failed.");
+ return -1;
+ }
+ signal(SIGPIPE, uds_sig_pipe);
+ p_uds_var->work_thread_num = atoi(argv[1]);
+ if (p_uds_var->work_thread_num <= 0 || p_uds_var->work_thread_num > UDS_WORK_THREAD_MAX) {
+ uds_err("work thread num:%d is too large.(must small or equal than %d)", p_uds_var->work_thread_num, UDS_WORK_THREAD_MAX);
+ return -1;
+ }
+ p_uds_var->efd = (int *)malloc(sizeof(int) * p_uds_var->work_thread_num);
+ if (p_uds_var->efd == NULL) {
+ uds_err("efd malloc failed, num:%d", p_uds_var->work_thread_num);
+ return -1;
+ }
+
+ p_uds_var->work_thread = (struct uds_thread_arg *)malloc(sizeof(struct uds_thread_arg) * p_uds_var->work_thread_num);
+ if (p_uds_var->work_thread == NULL) {
+ uds_err("work thread var malloc failed.");
+ return -1;
+ }
+ p_uds_var->tcp.port = atoi(argv[3]);
+ strncpy(p_uds_var->tcp.addr, argv[2], 20);
+ p_uds_var->tcp.peerport = atoi(argv[5]);
+ strncpy(p_uds_var->tcp.peeraddr, argv[4], 20);
+
+ uds_log("uds proxy param thread num:%d ip:%s port:%u peerip:%s port:%u",
+ p_uds_var->work_thread_num, p_uds_var->tcp.addr, p_uds_var->tcp.port,
+ p_uds_var->tcp.peeraddr, p_uds_var->tcp.peerport);
+ g_event_var = (struct uds_event_global_var *)malloc(sizeof(struct uds_event_global_var) * p_uds_var->work_thread_num);
+ if (g_event_var == NULL) {
+ uds_err("event variable malloc failed");
+ return -1;
+ }
+ uds_thread_create();
+
+ return 0;
+}
diff --git a/qtfs/ipc/uds_main.h b/qtfs/ipc/uds_main.h
new file mode 100644
index 0000000..793cd2f
--- /dev/null
+++ b/qtfs/ipc/uds_main.h
@@ -0,0 +1,141 @@
+#ifndef __QTFS_UDS_MAIN_H__
+#define __QTFS_UDS_MAIN_H__
+
+#include <time.h>
+
+#include "uds_module.h"
+
+#define UDS_EPOLL_MAX_EVENTS 64
+#define UDS_WORK_THREAD_MAX 64
+
+extern struct uds_global_var *p_uds_var;
+
+enum {
+ UDS_LOG_NONE,
+ UDS_LOG_ERROR,
+ UDS_LOG_INFO,
+ UDS_LOG_MAX,
+};
+
+#define uds_log(info, ...) \
+ if (p_uds_var->loglevel >= UDS_LOG_INFO) {\
+ time_t t; \
+ struct tm *p; \
+ time(&t); \
+ p = localtime(&t); \
+ printf("[%d/%02d/%02d %02d:%02d:%02d][LOG:%s:%3d]"info"\n", \
+ p->tm_year + 1900, p->tm_mon+1, p->tm_mday, \
+ p->tm_hour, p->tm_min, p->tm_sec, __func__, __LINE__, ##__VA_ARGS__); \
+ }
+
+#define uds_log2(info, ...) \
+ if (p_uds_var->loglevel >= UDS_LOG_INFO) {\
+ time_t t; \
+ struct tm *p; \
+ time(&t); \
+ p = localtime(&t); \
+ printf("[%d/%02d/%02d %02d:%02d:%02d][LOG:%s:%3d]"info"\n", \
+ p->tm_year + 1900, p->tm_mon+1, p->tm_mday, \
+ p->tm_hour, p->tm_min, p->tm_sec, __func__, __LINE__, ##__VA_ARGS__); \
+ }
+
+#define uds_err(info, ...) \
+ if (p_uds_var->loglevel >= UDS_LOG_ERROR) {\
+ time_t t; \
+ struct tm *p; \
+ time(&t); \
+ p = localtime(&t); \
+ printf("[%d/%02d/%02d %02d:%02d:%02d][ERROR:%s:%3d]"info"\n", \
+ p->tm_year + 1900, p->tm_mon+1, p->tm_mday, \
+ p->tm_hour, p->tm_min, p->tm_sec, __func__, __LINE__, ##__VA_ARGS__); \
+ }
+
+enum {
+ UDS_THREAD_EPWAIT = 1, // epoll wait status
+};
+struct uds_thread_info {
+ int fdnum;
+
+ int events;
+ int status;
+};
+
+struct uds_event_global_var {
+ int cur;
+ struct uds_event *tofree[UDS_EPOLL_MAX_EVENTS];
+ char *msg_control;
+ int msg_controllen;
+ char *msg_control_send;
+ int msg_controlsendlen;
+ char *iov_base;
+ int iov_len;
+ char *iov_base_send;
+ int iov_sendlen;
+ char *buf;
+ int buflen;
+};
+
+struct uds_event {
+ int fd; /* 本事件由这个fd触发 */
+ unsigned int tofree : 1, /* 1--in to free list; 0--not */
+ pipe : 1, // this is a pipe event
+ reserved : 30;
+ union {
+ struct uds_event *peer; /* peer event */
+ int peerfd; // scm pipe 场景单向导通,只需要一个fd即可
+ };
+ int (*handler)(void *, int, struct uds_event_global_var *); /* event处理函数 */
+ void *priv; // private data
+ char cpath[UDS_SUN_PATH_LEN];
+ char spath[UDS_SUN_PATH_LEN];
+};
+
+
+struct uds_thread_arg {
+ int efd;
+ struct uds_event_global_var *p_event_var;
+ struct uds_thread_info info;
+};
+
+struct uds_global_var {
+ int work_thread_num;
+ int *efd;
+ struct uds_thread_arg *work_thread;
+ int loglevel;
+ char *logstr[UDS_LOG_MAX + 1];
+ struct _tcp {
+ char addr[20];
+ unsigned short port;
+ char peeraddr[20];
+ unsigned short peerport;
+ } tcp;
+ struct _uds {
+ char sun_path[UDS_SUN_PATH_LEN];
+ } uds;
+};
+enum uds_cs {
+ UDS_SOCKET_CLIENT = 1,
+ UDS_SOCKET_SERVER,
+};
+
+struct uds_conn_arg {
+ int cs; // client(1) or server(2)
+
+ int udstype; // DGRAM or STREAM
+ char sun_path[UDS_SUN_PATH_LEN];
+ int sockfd;
+ int connfd;
+};
+
+struct uds_event *uds_add_event(int fd, struct uds_event *peer, int (*handler)(void *, int, struct uds_event_global_var *), void *priv);
+struct uds_event *uds_add_pipe_event(int fd, int peerfd, int (*handler)(void *, int, struct uds_event_global_var *), void *priv);
+int uds_sock_step_accept(int sockFd, int family);
+int uds_build_tcp_connection(struct uds_conn_arg *arg);
+int uds_build_unix_connection(struct uds_conn_arg *arg);
+void uds_del_event(struct uds_event *evt);
+int uds_event_suspend(int efd, struct uds_event *event);
+int uds_event_insert(int efd, struct uds_event *event);
+#ifdef QTFS_SERVER
+int uds_proxy_main(int argc, char *argv[]);
+#endif
+#endif
diff --git a/qtfs/ipc/uds_module.h b/qtfs/ipc/uds_module.h
new file mode 100644
index 0000000..9ccbb9d
--- /dev/null
+++ b/qtfs/ipc/uds_module.h
@@ -0,0 +1,19 @@
+#ifndef __QTFS_UDS_MODULE_H__
+#define __QTFS_UDS_MODULE_H__
+
+#define UDS_BUILD_CONN_ADDR "/var/run/qtfs/remote_uds.sock"
+#define UDS_DIAG_ADDR "/var/run/qtfs/uds_proxy_diag.sock"
+#define UDS_LOGLEVEL_UPD "/var/run/qtfs/uds_loglevel.sock"
+#define UDS_BUILD_CONN_DIR "/var/run/qtfs/"
+
+#define UDS_SUN_PATH_LEN 108 // from glibc
+struct uds_proxy_remote_conn_req {
+ unsigned short type;
+ unsigned short resv;
+ char sun_path[UDS_SUN_PATH_LEN];
+};
+struct uds_proxy_remote_conn_rsp {
+ int ret;
+};
+
+#endif
diff --git a/qtfs/misc.c b/qtfs/misc.c
index 98222bd..44da4e1 100644
--- a/qtfs/misc.c
+++ b/qtfs/misc.c
@@ -156,6 +156,13 @@ long qtfs_misc_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
}
break;
}
+ case QTFS_IOCTL_UDS_PROXY_PID:
+ if (copy_from_user(&qtfs_uds_proxy_pid, (void *)arg, sizeof(int))) {
+ qtfs_err("ioctl get uds proxy pid failed.");
+ break;
+ }
+ qtfs_info("ioctl get uds proxy process pid is %d", qtfs_uds_proxy_pid);
+ break;
}
return ret;
}
diff --git a/qtfs/qtfs/sb.c b/qtfs/qtfs/sb.c
index 7445fad..104d137 100644
--- a/qtfs/qtfs/sb.c
+++ b/qtfs/qtfs/sb.c
@@ -288,7 +288,7 @@ ssize_t qtfs_readiter(struct kiocb *kio, struct iov_iter *iov)
req->fd = private->fd;
if (req->fd <= 0) {
- qtfs_err("qtfs_readiter: invalid file(0x%llx)", req->fd);
+ qtfs_err("qtfs_readiter: invalid file(%d)", req->fd);
qtfs_conn_put_param(pvar);
return -EINVAL;
}
@@ -360,7 +360,7 @@ ssize_t qtfs_writeiter(struct kiocb *kio, struct iov_iter *iov)
req->d.fd = private->fd;
if (req->d.fd < 0) {
- qtfs_err("qtfs_write: invalid file(0x%llx)", req->d.fd);
+ qtfs_err("qtfs_write: invalid file(%d)", req->d.fd);
qtfs_conn_put_param(pvar);
return -EINVAL;
}
@@ -1172,7 +1172,7 @@ int qtfs_getattr(const struct path *path, struct kstat *stat, u32 req_mask, unsi
*stat = rsp->stat;
qtfs_debug("qtfs getattr success:<%s> blksiz:%u size:%lld mode:%o ino:%llu pathino:%lu. %s\n", req->path, rsp->stat.blksize,
rsp->stat.size, rsp->stat.mode, rsp->stat.ino, inode->i_ino, rsp->stat.ino != inode->i_ino ? "delete current inode" : "");
- if (inode->i_ino != rsp->stat.ino || rsp->stat.mode != inode->i_mode) {
+ if (inode->i_ino != rsp->stat.ino || inode->i_mode != rsp->stat.mode) {
if (inode->i_nlink > 0){
drop_nlink(inode);
}
diff --git a/qtfs/qtfs_server/Makefile b/qtfs/qtfs_server/Makefile
index 9c6bcd5..2ff826f 100644
--- a/qtfs/qtfs_server/Makefile
+++ b/qtfs/qtfs_server/Makefile
@@ -4,15 +4,26 @@ KBUILD=/lib/modules/$(shell uname -r)/build/
obj-m:=qtfs_server.o
qtfs_server-objs:=../conn.o fsops.o qtfs-server.o ../misc.o
+DEPGLIB=-lglib-2.0 -I../ -I/usr/include/glib-2.0 -I/usr/lib64/glib-2.0/include
+
all: qtfs_server engine
qtfs_server:
make -C $(KBUILD) M=$(PWD) modules
-engine:
- gcc -O2 -o engine user_engine.c -lpthread -lglib-2.0 -I../ -I/usr/include/glib-2.0 -I/usr/lib64/glib-2.0/include -DQTFS_SERVER
+engine: uds_event.o uds_main.o user_engine.o
+ gcc -O2 -o engine $^ -lpthread $(DEPGLIB) -I../ -I../ipc/ -DQTFS_SERVER
+
+user_engine.o:
+ cc -g -c -o user_engine.o user_engine.c $(DEPGLIB) -I../ -DQTFS_SERVER
+
+uds_event.o:
+ cc -g -c -o uds_event.o ../ipc/uds_event.c -DQTFS_SERVER
+
+uds_main.o:
+ cc -g -c -o uds_main.o ../ipc/uds_main.c -DQTFS_SERVER
clean:
make -C $(KBUILD) M=$(PWD) clean
rm -rf engine
- rm -rf ../*.o ../.*.o.cmd
+ rm -rf ../*.o
diff --git a/qtfs/qtfs_server/fsops.c b/qtfs/qtfs_server/fsops.c
index 61e8895..6c3e201 100644
--- a/qtfs/qtfs_server/fsops.c
+++ b/qtfs/qtfs_server/fsops.c
@@ -25,10 +25,11 @@
bool in_white_list(char *path, int type)
{
+ int i, in_wl = -1;
+
if (!whitelist[type]) {
return true;
}
- int i, in_wl = -1;
for (i = 0; i < whitelist[type]->len; i++) {
if (!strncmp(path, whitelist[type]->wl[i].path, whitelist[type]->wl[i].len)){
in_wl = i;
@@ -202,7 +203,7 @@ static int handle_statfs(struct qtserver_arg *arg)
static int handle_mount(struct qtserver_arg *arg)
{
struct path path;
- int ret, i, in_wl = -1;
+ int ret;
struct qtreq_mount *req = (struct qtreq_mount *)REQ(arg);
struct qtrsp_mount *rsp = (struct qtrsp_mount *)RSP(arg);
if (!in_white_list(req->path, QTFS_WHITELIST_MOUNT)) {
diff --git a/qtfs/qtfs_server/qtfs-server.c b/qtfs/qtfs_server/qtfs-server.c
index b0b8ab0..cbe07f0 100644
--- a/qtfs/qtfs_server/qtfs-server.c
+++ b/qtfs/qtfs_server/qtfs-server.c
@@ -214,11 +214,6 @@ long qtfs_server_misc_ioctl(struct file *file, unsigned int cmd, unsigned long a
qtfs_server_thread_run = arg;
break;
- case QTFS_IOCTL_ALLINFO:
- case QTFS_IOCTL_CLEARALL:
- case QTFS_IOCTL_LOGLEVEL:
- ret = qtfs_misc_ioctl(file, cmd, arg);
- break;
case QTFS_IOCTL_WHITELIST:
if (copy_from_user(&len, (void __user *)arg, sizeof(int))) {
qtfs_err("qtfs ioctl white init copy from user failed.");
@@ -239,6 +234,12 @@ long qtfs_server_misc_ioctl(struct file *file, unsigned int cmd, unsigned long a
qtfs_err("init %d list:%d %s", tmp->type, i, whitelist[tmp->type]->wl[i].path);
}
break;
+ case QTFS_IOCTL_ALLINFO:
+ case QTFS_IOCTL_CLEARALL:
+ case QTFS_IOCTL_LOGLEVEL:
+ case QTFS_IOCTL_UDS_PROXY_PID:
+ ret = qtfs_misc_ioctl(file, cmd, arg);
+ break;
default:
qtfs_err("qtfs misc ioctl unknown cmd:%u.", cmd);
break;
diff --git a/qtfs/qtfs_server/user_engine.c b/qtfs/qtfs_server/user_engine.c
index 547935c..a3d627d 100644
--- a/qtfs/qtfs_server/user_engine.c
+++ b/qtfs/qtfs_server/user_engine.c
@@ -14,6 +14,7 @@
#include <sys/epoll.h>
#include "comm.h"
+#include "ipc/uds_main.h"
char wl_type_str[QTFS_WHITELIST_MAX][10] = {"Open", "Write", "Read", "Readdir", "Mkdir", "Rmdir", "Create", "Unlink", "Rename", "Setattr", "Setxattr", "Mount"};
@@ -220,13 +221,12 @@ int qtfs_whitelist_init(int fd)
int main(int argc, char *argv[])
{
- if (argc != 3) {
- engine_out("Usage: %s <buf size> <number of threads>.", argv[0]);
- engine_out(" Example: %s 4096 16.", argv[0]);
+ if (argc != 7) {
+ engine_out("Usage: %s <number of threads> <uds proxy thread num> <host ip> <uds proxy port> <dpu ip> <uds proxy port>.", argv[0]);
+ engine_out(" Example: %s 16 1 192.168.10.10 12121 192.168.10.11 12121.", argv[0]);
return -1;
}
- int psize = atoi(argv[1]);
- int thread_nums = atoi(argv[2]);
+ int thread_nums = atoi(argv[1]);
int fd = open(QTFS_SERVER_FILE, O_RDONLY);
if (fd < 0) {
engine_err("qtfs server file:%s open failed, fd:%d.", QTFS_SERVER_FILE, fd);
@@ -247,9 +247,9 @@ int main(int argc, char *argv[])
pthread_t texec[QTFS_MAX_THREADS];
pthread_t tepoll;
- if (psize > QTFS_USERP_MAXSIZE || thread_nums > QTFS_MAX_THREADS) {
- engine_err("qtfs engine param invalid, size:%d(must <= %d) thread_nums:%d(must <= %d).",
- psize, QTFS_USERP_MAXSIZE, thread_nums, QTFS_MAX_THREADS);
+ if (thread_nums > QTFS_MAX_THREADS) {
+ engine_err("qtfs engine param invalid, thread_nums:%d(must <= %d).",
+ thread_nums, QTFS_MAX_THREADS);
goto end;
}
(void)ioctl(fd, QTFS_IOCTL_EXIT, 1);
@@ -257,24 +257,30 @@ int main(int argc, char *argv[])
signal(SIGKILL, qtfs_signal_int);
signal(SIGTERM, qtfs_signal_int);
- struct qtfs_server_userp_s *userp = qtfs_engine_thread_init(fd, thread_nums, psize);
+ struct qtfs_server_userp_s *userp = qtfs_engine_thread_init(fd, thread_nums, QTFS_USERP_SIZE);
if (userp == NULL) {
engine_out("qtfs engine userp init failed.");
goto end;
}
struct engine_arg arg[QTFS_MAX_THREADS];
for (int i = 0; i < thread_nums; i++) {
- arg[i].psize = psize;
+ arg[i].psize = QTFS_USERP_SIZE;
arg[i].fd = fd;
arg[i].thread_idx = i;
(void)pthread_create(&texec[i], NULL, qtfs_engine_kthread, &arg[i]);
}
(void)pthread_create(&tepoll, NULL, qtfs_engine_epoll_thread, &arg[0]);
+ // 必须放在这个位置,uds main里面最终也有join
+ if (uds_proxy_main(6, &argv[1]) != 0) {
+ engine_out("uds proxy start failed.");
+ goto engine_free;
+ }
for (int i = 0; i < thread_nums; i++) {
pthread_join(texec[i], NULL);
engine_out("qtfs engine join thread %d.", i);
}
pthread_join(tepoll, NULL);
+engine_free:
qtfs_engine_userp_free(userp, thread_nums);
engine_out("qtfs engine join epoll thread.");
end:
diff --git a/qtfs/qtinfo/qtinfo.c b/qtfs/qtinfo/qtinfo.c
index dc88da0..a8ba2e0 100644
--- a/qtfs/qtinfo/qtinfo.c
+++ b/qtfs/qtinfo/qtinfo.c
@@ -4,9 +4,13 @@
#include <fcntl.h>
#include <sys/ioctl.h>
#include <string.h>
+#include <stddef.h>
+#include <sys/socket.h>
+#include <sys/un.h>
#include "qtinfo.h"
#include "comm.h"
+#include "ipc/uds_main.h"
#ifdef client
#define QTFS_DEV_NAME "/dev/qtfs_client"
@@ -312,6 +316,69 @@ void qtinfo_opt_p(int fd, char *support)
return;
}
+#define PATH_MAX 4096
+void qtinfo_opt_u()
+{
+ int len;
+ struct sockaddr_un svr;
+ int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (sockfd < 0) {
+ qtinfo_err("Create socket fd failed.");
+ return;
+ }
+
+ memset(&svr, 0, sizeof(svr));
+ svr.sun_family = AF_UNIX;
+ strcpy(svr.sun_path, UDS_DIAG_ADDR);
+ len = offsetof(struct sockaddr_un, sun_path) + strlen(svr.sun_path);
+ if (connect(sockfd, (struct sockaddr *)&svr, len) < 0) {
+ qtinfo_err("connect to %s failed.", UDS_DIAG_ADDR);
+ return;
+ }
+ while (1) {
+ char buf[256];
+ int n;
+ memset(buf, 0, 256);
+ n = recv(sockfd, buf, 256, 0);
+ if (n <= 0)
+ break;
+ qtinfo_out2("%s", buf);
+ }
+ close(sockfd);
+ return;
+}
+
+void qtinfo_opt_s()
+{
+ int len;
+ struct sockaddr_un svr;
+ int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (sockfd < 0) {
+ qtinfo_err("Create socket fd failed.");
+ return;
+ }
+
+ memset(&svr, 0, sizeof(svr));
+ svr.sun_family = AF_UNIX;
+ strcpy(svr.sun_path, UDS_LOGLEVEL_UPD);
+ len = offsetof(struct sockaddr_un, sun_path) + strlen(svr.sun_path);
+ if (connect(sockfd, (struct sockaddr *)&svr, len) < 0) {
+ qtinfo_err("connect to %s failed.", UDS_LOGLEVEL_UPD);
+ return;
+ }
+ while (1) {
+ char buf[256];
+ int n;
+ memset(buf, 0, 256);
+ n = recv(sockfd, buf, 256, 0);
+ if (n <= 0)
+ break;
+ qtinfo_out2("%s", buf);
+ }
+ close(sockfd);
+ return;
+
+}
static void qtinfo_help(char *exec)
{
@@ -322,6 +389,8 @@ static void qtinfo_help(char *exec)
qtinfo_out(" -l, Set log level(valid param: \"NONE\", \"ERROR\", \"WARN\", \"INFO\", \"DEBUG\").");
qtinfo_out(" -t, For test informations.");
qtinfo_out(" -p, Epoll support file mode(1: any files; 0: only fifo).");
+ qtinfo_out(" -u, Display unix socket proxy diagnostic info");
+ qtinfo_out(" -s, Set unix socket proxy log level(Increase by 1 each time)");
}
int main(int argc, char *argv[])
@@ -334,7 +403,7 @@ int main(int argc, char *argv[])
qtinfo_err("open file %s failed.", QTFS_DEV_NAME);
return 0;
}
- while ((ch = getopt(argc, argv, "acl:tp:")) != -1) {
+ while ((ch = getopt(argc, argv, "acl:tp:us")) != -1) {
switch (ch) {
case 'a':
qtinfo_opt_a(fd);
@@ -351,6 +420,12 @@ int main(int argc, char *argv[])
case 'p':
qtinfo_opt_p(fd, optarg);
break;
+ case 'u':
+ qtinfo_opt_u();
+ break;
+ case 's':
+ qtinfo_opt_s();
+ break;
default:
qtinfo_help(argv[0]);
break;
diff --git a/qtfs/qtsock.c b/qtfs/qtsock.c
new file mode 100644
index 0000000..58b2eab
--- /dev/null
+++ b/qtfs/qtsock.c
@@ -0,0 +1,332 @@
+#include <linux/fs.h>
+#include <linux/init.h>
+#include <linux/kernel.h>
+#include <linux/ftrace.h>
+#include <linux/kprobes.h>
+#include <net/sock.h>
+#include <linux/netlink.h>
+#include <linux/un.h>
+
+#include "conn.h"
+#include "log.h"
+#include "comm.h"
+#include "qtfs/syscall.h"
+
+#define MAX_SOCK_PATH_LEN 108
+
+static struct socket *qtfs_sock = NULL;
+static struct mutex qtfs_sock_mutex;
+static char qtfs_sock_path[] = "/var/run/qtfs/remote_uds.sock";
+
+struct qtsock_wl_stru qtsock_wl;
+
+static struct sock *(*origin_unix_find_other)(struct net *net,
+ struct sockaddr_un *sunname, int len,
+ int type, unsigned int hash, int *error);
+
+struct ftrace_hook {
+ const char *name;
+ void *func;
+ void *origin;
+
+ unsigned long addr;
+ struct ftrace_ops ops;
+};
+
+struct ftrace_hook unix_find_other_hook;
+
+static int resolve_hook_address(struct ftrace_hook *hook)
+{
+ hook->addr = qtfs_kallsyms_lookup_name(hook->name);
+ if (!hook->addr) {
+ qtfs_warn("unresolved symbol during resolving hook address:%s\n", hook->name);
+ return -ENOENT;
+ }
+ *((unsigned long *)hook->origin) = hook->addr;
+
+ return 0;
+}
+
+static void notrace ftrace_thunk(unsigned long ip, unsigned long parent_ip,
+ struct ftrace_ops *ops, struct pt_regs *regs)
+{
+ struct ftrace_hook *hook = container_of(ops, struct ftrace_hook, ops);
+
+ if (!within_module(parent_ip, THIS_MODULE))
+ regs->ip = (unsigned long)hook->func;
+}
+
+int install_hook(struct ftrace_hook *hook)
+{
+ int err;
+
+ err = resolve_hook_address(hook);
+ if (err)
+ return err;
+
+ hook->ops.func = ftrace_thunk;
+ hook->ops.flags = FTRACE_OPS_FL_SAVE_REGS | FTRACE_OPS_FL_IPMODIFY;
+
+ err = ftrace_set_filter_ip(&hook->ops, hook->addr, 0, 0);
+ if (err) {
+ qtfs_err("ftrace_set_filter_ip failed:%d\n", err);
+ return err;
+ }
+
+ err = register_ftrace_function(&hook->ops);
+ if (err) {
+ qtfs_err("register_ftrace_function failed with :%d\n", err);
+ ftrace_set_filter_ip(&hook->ops, hook->addr, 1, 0);
+ return err;
+ }
+ qtfs_info("install hook(%s) done\n", hook->name);
+
+ return 0;
+}
+
+void remove_hook(struct ftrace_hook *hook)
+{
+ int err;
+
+ err = unregister_ftrace_function(&hook->ops);
+ if (err)
+ qtfs_err("unregister_ftrace_function failed:%d\n", err);
+
+ err = ftrace_set_filter_ip(&hook->ops, hook->addr, 1, 0);
+ if (err)
+ qtfs_err("ftrace_set_filter_ip failed:%d\n", err);
+ qtfs_info("remove hook(%s) done", hook->name);
+}
+
+struct qtfs_sock_req {
+ int magic;
+ int type;
+ char sunname[MAX_SOCK_PATH_LEN];
+};
+
+struct qtfs_sock_rsp {
+ int found;
+};
+
+static int qtsock_conn(void)
+{
+ int ret;
+ struct sockaddr_un saddr;
+
+ ret = mutex_lock_interruptible(&qtfs_sock_mutex);
+ if (ret <0) {
+ qtfs_err("Failed to get qtfs sock mutex lock:%d\n", ret);
+ return false;
+ }
+ // calling this function means qtfs_sock isn't working properly.
+ // so it's ok to release and clean old qtfs_sock
+ if (qtfs_sock) {
+ sock_release(qtfs_sock);
+ qtfs_sock = NULL;
+ }
+ // connect to userspace unix socket server
+ ret = __sock_create(&init_net, AF_UNIX, SOCK_STREAM, 0, &qtfs_sock, 1);
+ if (ret) {
+ qtfs_err("qtfs sock client init create sock failed:%d\n", ret);
+ mutex_unlock(&qtfs_sock_mutex);
+ return ret;
+ }
+ saddr.sun_family = PF_UNIX;
+ strcpy(saddr.sun_path, qtfs_sock_path);
+ ret = qtfs_sock->ops->connect(qtfs_sock, (struct sockaddr *)&saddr,
+ sizeof(struct sockaddr_un) - 1, 0);
+ if (ret) {
+ qtfs_err("qtfs sock client sock connect failed:%d\n", ret);
+ sock_release(qtfs_sock);
+ qtfs_sock = NULL;
+ mutex_unlock(&qtfs_sock_mutex);
+ return ret;
+ }
+
+ mutex_unlock(&qtfs_sock_mutex);
+ return ret;
+}
+
+bool qtfs_udsfind(char *sunname, int len, int type)
+{
+ struct qtfs_sock_req qs_req;
+ struct qtfs_sock_rsp qs_rsp;
+ struct kvec send_vec, recv_vec;
+ struct msghdr send_msg, recv_msg;
+ int ret;
+ int retry = 0, penalty = 100, i = 0;
+
+ // qtfs_sock still not initialized, try to connect to server
+ if (!qtfs_sock && (qtsock_conn() < 0)) {
+ qtfs_err("failed to connect to qtfs socket\n");
+ return false;
+ }
+ if (len > MAX_SOCK_PATH_LEN) {
+ qtfs_err("Invalid socket path name len(%d)\n", len);
+ return false;
+ }
+ memset(&qs_req, 0, sizeof(qs_req));
+ memset(&qs_rsp, 0, sizeof(qs_rsp));
+ strncpy(qs_req.sunname, sunname, len);
+ qs_req.type = type;
+ qs_req.magic = 0xDEADBEEF;
+
+ memset(&send_msg, 0, sizeof(send_msg));
+ memset(&send_vec, 0, sizeof(send_vec));
+ memset(&recv_msg, 0, sizeof(recv_msg));
+ memset(&recv_vec, 0, sizeof(recv_vec));
+
+ send_vec.iov_base = &qs_req;
+ send_vec.iov_len = sizeof(qs_req);
+ qtfs_info("qtfs uds find socket(%s), type(%d)\n", sunname, type);
+
+reconn:
+ if (retry) {
+ for (i = 0; i < retry; i++) {
+ if (qtsock_conn() == 0)
+ break;
+ qtfs_err("qtfs socket reconnect failed for %d trial", i+1);
+ penalty *= 2;
+ msleep(penalty);
+ }
+ }
+ ret = mutex_lock_interruptible(&qtfs_sock_mutex);
+ if (ret < 0) {
+ qtfs_err("Failed to get qtfs sock mutex lock:%d\n", ret);
+ return false;
+ }
+ if (!qtfs_sock) {
+ qtfs_err("qtfs_sock is NULL, please check\n");
+ mutex_unlock(&qtfs_sock_mutex);
+ return false;
+ }
+ send_msg.msg_flags |= MSG_NOSIGNAL;
+ ret = kernel_sendmsg(qtfs_sock, &send_msg, &send_vec, 1, sizeof(qs_req));
+ if (ret == -EPIPE && retry == 0) {
+ qtfs_err("uds find connection has broken, try to reconnect\n");
+ retry = 3;
+ mutex_unlock(&qtfs_sock_mutex);
+ goto reconn;
+ } else if (ret < 0) {
+ qtfs_err("Failed to send uds find message:%d\n", ret);
+ mutex_unlock(&qtfs_sock_mutex);
+ return false;
+ }
+
+ // waiting for response
+ recv_vec.iov_base = &qs_rsp;
+ recv_vec.iov_len = sizeof(qs_rsp);
+retry:
+ recv_msg.msg_flags |= MSG_NOSIGNAL;
+ ret = kernel_recvmsg(qtfs_sock, &recv_msg, &recv_vec, 1, sizeof(qs_rsp), 0);
+ if (ret == -ERESTARTSYS || ret == -EINTR) {
+ qtfs_err("uds remote find get interrupted, just retry");
+ msleep(1);
+ goto retry;
+ }
+ mutex_unlock(&qtfs_sock_mutex);
+ if (ret < 0) {
+ qtfs_err("Failed to receive uds find response:%d\n", ret);
+ return false;
+ }
+ qtfs_info("uds remote find socket(%s), type(%d), result:%s\n", sunname, type, qs_rsp.found ? "found" : "not found");
+ return qs_rsp.found;
+}
+
+static int uds_find_whitelist(const char *path)
+{
+ int i;
+ int ret = 1;
+ read_lock(&qtsock_wl.rwlock);
+ for (i = 0; i< qtsock_wl.nums; i++) {
+ if (strncmp(path, qtsock_wl.wl[i], strlen(qtsock_wl.wl[i])) == 0) {
+ ret = 0;
+ break;
+ }
+ }
+ read_unlock(&qtsock_wl.rwlock);
+ return ret;
+}
+
+static inline bool uds_is_proxy(void)
+{
+ return (current->tgid == qtfs_uds_proxy_pid);
+}
+
+static struct sock *qtfs_unix_find_other(struct net *net,
+ struct sockaddr_un *sunname, int len,
+ int type, unsigned int hash, int *error)
+{
+ struct sock *other = NULL;
+ bool found = false;
+
+ qtfs_debug("in qtfs_unix_find_other (%s)\n", sunname->sun_path);
+ other = origin_unix_find_other(net, sunname, len, type, hash, error);
+ if (other) {
+ qtfs_debug("find unix other sock(%s) locally", sunname->sun_path);
+ return other;
+ }
+
+ // do not call remote find if sunname is annomous or sunpath not in whitelist
+ if (!sunname->sun_path[0] || uds_find_whitelist(sunname->sun_path) ||
+ uds_is_proxy() == true) {
+ *error = -ECONNREFUSED;
+ return NULL;
+ }
+
+ qtfs_info("Failed to find unix other sock(%s) locally, try to find remotely\n", sunname->sun_path);
+ // refer userspace service to get remote socket status
+ // if found, which means userspace service has create this unix socket server, just go to origin_unix_find_other, it will be found
+ // if not found, return NULL
+ found = qtfs_udsfind(sunname->sun_path, len, type);
+ if (!found) {
+ qtfs_info("failed to find unix other sock(%s) remotely", sunname->sun_path);
+ *error = -ECONNREFUSED;
+ return NULL;
+ }
+ qtfs_info("find unix other sock(%s) remotely\n", sunname->sun_path);
+
+ // found it remotely, so we will inform userspace engine to create specfic unix socket and connect to qtfs server
+ // and call unix_find_other locally
+ // xxx: will this be called recursively? Hope not
+ return origin_unix_find_other(net, sunname, len, type, hash, error);
+}
+
+int qtfs_sock_init(void)
+{
+ qtfs_kallsyms_hack_init();
+
+ qtfs_info("in qtfs ftrace hook unix_find_other\n");
+ unix_find_other_hook.name = "unix_find_other";
+ unix_find_other_hook.func = qtfs_unix_find_other;
+ unix_find_other_hook.origin = &origin_unix_find_other;
+
+ install_hook(&unix_find_other_hook);
+ mutex_init(&qtfs_sock_mutex);
+ rwlock_init(&qtsock_wl.rwlock);
+ qtsock_wl.nums = 0;
+ qtsock_wl.wl = (char **)kmalloc(sizeof(char *) * QTSOCK_WL_MAX_NUM, GFP_KERNEL);
+ if (qtsock_wl.wl == NULL) {
+
+ qtfs_err("failed to kmalloc wl, max num:%d", QTSOCK_WL_MAX_NUM);
+ }
+
+ return 0;
+}
+
+void qtfs_sock_exit(void)
+{
+ int ret;
+ qtfs_info("exit qtfs ftrace, remove unix_find_other_hook\n");
+ remove_hook(&unix_find_other_hook);
+
+ ret = mutex_lock_interruptible(&qtfs_sock_mutex);
+ if (ret < 0)
+ qtfs_err("Failed to get qtfs sock mutex lock:%d\n", ret);
+ // close unix socket connected to userspace
+ if (qtfs_sock) {
+ sock_release(qtfs_sock);
+ qtfs_sock = NULL;
+ }
+ mutex_unlock(&qtfs_sock_mutex);
+}
--
2.33.0
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。