diff --git a/airpower-core/src/main/java/cn/hamm/airpower/config/Configs.java b/airpower-core/src/main/java/cn/hamm/airpower/config/Configs.java
index 0818bae574a54b659f69378ba105ecee3e0a88e0..1afd760c6dec09537f4091086e29d4a5713c2c9b 100644
--- a/airpower-core/src/main/java/cn/hamm/airpower/config/Configs.java
+++ b/airpower-core/src/main/java/cn/hamm/airpower/config/Configs.java
@@ -1,5 +1,6 @@
package cn.hamm.airpower.config;
+import cn.hamm.airpower.websocket.WebSocketConfig;
import lombok.Getter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -29,14 +30,23 @@ public class Configs {
@Getter
private static MqttConfig mqttConfig;
+
+ /**
+ *
WebSocket配置
+ */
+ @Getter
+ private static WebSocketConfig websocketConfig;
+
@Autowired
Configs(
CookieConfig cookieConfig,
ServiceConfig serviceConfig,
- MqttConfig mqttConfig
+ MqttConfig mqttConfig,
+ WebSocketConfig websocketConfig
) {
Configs.cookieConfig = cookieConfig;
Configs.serviceConfig = serviceConfig;
Configs.mqttConfig = mqttConfig;
+ Configs.websocketConfig = websocketConfig;
}
}
diff --git a/airpower-core/src/main/java/cn/hamm/airpower/config/ServiceConfig.java b/airpower-core/src/main/java/cn/hamm/airpower/config/ServiceConfig.java
index b44393cad668a3147aa97f51488a24cb27931217..802e9c711ebdb88bc8d01a1681ba4f9c0dab25bc 100644
--- a/airpower-core/src/main/java/cn/hamm/airpower/config/ServiceConfig.java
+++ b/airpower-core/src/main/java/cn/hamm/airpower/config/ServiceConfig.java
@@ -23,6 +23,11 @@ public class ServiceConfig {
*/
private String databasePrefix = "tenant_";
+ /**
+ * 默认服务ID
+ */
+ private int serviceId = Constant.ZERO_INT;
+
/**
* 是否开启文档
*/
diff --git a/airpower-core/src/main/java/cn/hamm/airpower/util/Utils.java b/airpower-core/src/main/java/cn/hamm/airpower/util/Utils.java
index 5735aedad7a7df07ac12b716937af88e7a4ef5cd..ad256a9d8b5781f973b95f4ba3758b8b81a3d903 100644
--- a/airpower-core/src/main/java/cn/hamm/airpower/util/Utils.java
+++ b/airpower-core/src/main/java/cn/hamm/airpower/util/Utils.java
@@ -1,5 +1,6 @@
package cn.hamm.airpower.util;
+import cn.hamm.airpower.websocket.WebsocketUtil;
import jakarta.persistence.EntityManager;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
@@ -138,6 +139,12 @@ public class Utils {
@Getter
private static HttpServletResponse response;
+ /**
+ * WebSocket工具类
+ */
+ @Getter
+ private static WebsocketUtil websocketUtil;
+
/**
* 数字工具
*/
@@ -193,7 +200,8 @@ public class Utils {
HttpServletRequest httpServletRequest,
HttpServletResponse httpServletResponse,
NumberUtil numberUtil,
- StringUtil stringUtil
+ StringUtil stringUtil,
+ WebsocketUtil websocketUtil
) {
Utils.redisUtil = redisUtil;
Utils.emailUtil = emailUtil;
@@ -217,6 +225,7 @@ public class Utils {
Utils.response = httpServletResponse;
Utils.numberUtil = numberUtil;
Utils.stringUtil = stringUtil;
+ Utils.websocketUtil = websocketUtil;
}
/**
diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketConfig.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketConfig.java
new file mode 100644
index 0000000000000000000000000000000000000000..e1471678dde307c9d79d73f476196baaea786521
--- /dev/null
+++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketConfig.java
@@ -0,0 +1,61 @@
+package cn.hamm.airpower.websocket;
+
+import lombok.Data;
+import lombok.experimental.Accessors;
+import org.jetbrains.annotations.NotNull;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
+import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
+import org.springframework.web.socket.handler.TextWebSocketHandler;
+
+/**
+ * WebSocket配置
+ *
+ * @author Hamm
+ */
+@Component
+@Data
+@Accessors(chain = true)
+@Configuration
+@ConfigurationProperties("airpower.websocket")
+public class WebSocketConfig implements WebSocketConfigurer {
+ /**
+ * PING
+ */
+ private String ping = "ping";
+
+ /**
+ * PONG
+ */
+ private String pong = "pong";
+
+ /**
+ * Websocket 路径
+ */
+ private String path = "/websocket";
+
+ /**
+ * WebSocket支持方式
+ */
+ private WebSocketSupport support = WebSocketSupport.NO;
+
+ @Bean
+ public TextWebSocketHandler getWebSocketHandler() {
+ return new WebsocketHandler();
+ }
+
+ /**
+ * 添加WebSocket服务监听
+ *
+ * @param registry WebSocketHandlerRegistry
+ */
+ @Override
+ public void registerWebSocketHandlers(@NotNull WebSocketHandlerRegistry registry) {
+ if (!support.equals(WebSocketSupport.NO)) {
+ registry.addHandler(getWebSocketHandler(), path).setAllowedOrigins("*");
+ }
+ }
+}
diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketEvent.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketEvent.java
new file mode 100644
index 0000000000000000000000000000000000000000..c5258bbd237877f3712b0af97015b205886c4d82
--- /dev/null
+++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketEvent.java
@@ -0,0 +1,55 @@
+package cn.hamm.airpower.websocket;
+
+import cn.hamm.airpower.config.Configs;
+import cn.hamm.airpower.config.Constant;
+import lombok.Data;
+import lombok.experimental.Accessors;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * WebSocket事件
+ *
+ * @author Hamm.cn
+ */
+@Data
+@Accessors(chain = true)
+public class WebSocketEvent {
+ /**
+ * 当前事件ID
+ */
+ private static final AtomicLong CURRENT_EVENT_ID = new AtomicLong(Constant.ZERO_LONG);
+
+ /**
+ * 事件ID
+ */
+ private String id;
+
+ /**
+ * 事件名称
+ */
+ private String event = "message";
+
+ /**
+ * 消息产生时间
+ */
+ private Long time;
+
+ /**
+ * 消息对象
+ */
+ private T data;
+
+ WebSocketEvent() {
+ long time = System.currentTimeMillis();
+ this.id = Base64.getEncoder().encodeToString((String.format(
+ "%s-%s-%s",
+ Configs.getServiceConfig().getServiceId(),
+ CURRENT_EVENT_ID.incrementAndGet(),
+ time
+ )).getBytes(StandardCharsets.UTF_8));
+ this.time = time;
+ }
+}
\ No newline at end of file
diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketMessage.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketMessage.java
new file mode 100644
index 0000000000000000000000000000000000000000..4544a2357e25353f5b69f7bb6ebed30c76198676
--- /dev/null
+++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketMessage.java
@@ -0,0 +1,23 @@
+package cn.hamm.airpower.websocket;
+
+import lombok.Data;
+import lombok.experimental.Accessors;
+
+/**
+ * WebSocket消息基类
+ *
+ * @author Hamm.cn
+ */
+@Data
+@Accessors(chain = true)
+public class WebSocketMessage {
+ /**
+ * 消息类型
+ */
+ private String type;
+
+ /**
+ * 消息内容
+ */
+ private Object payload;
+}
\ No newline at end of file
diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketSupport.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketSupport.java
new file mode 100644
index 0000000000000000000000000000000000000000..1f0a905d10e7edcfce1fc83eb3a09b8a33bbe3e8
--- /dev/null
+++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketSupport.java
@@ -0,0 +1,23 @@
+package cn.hamm.airpower.websocket;
+
+/**
+ * WebSocket支持
+ *
+ * @author Hamm.cn
+ */
+public enum WebSocketSupport {
+ /**
+ * Redis
+ */
+ REDIS,
+
+ /**
+ * MQTT
+ */
+ MQTT,
+
+ /**
+ * 不支持
+ */
+ NO,
+}
diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java
new file mode 100644
index 0000000000000000000000000000000000000000..f5942948c3f737b006fee8bb050993ea72df6282
--- /dev/null
+++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java
@@ -0,0 +1,184 @@
+package cn.hamm.airpower.websocket;
+
+import cn.hamm.airpower.config.Configs;
+import cn.hamm.airpower.exception.ServiceException;
+import cn.hamm.airpower.model.Json;
+import cn.hamm.airpower.util.Utils;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.*;
+import org.jetbrains.annotations.NotNull;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.lang.NonNull;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.CloseStatus;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.handler.TextWebSocketHandler;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+
+/**
+ * WebSocket Handler
+ *
+ * @author Hamm
+ */
+@Component
+@Slf4j
+public class WebsocketHandler extends TextWebSocketHandler implements MessageListener {
+ /**
+ * Redis连接工厂
+ */
+ @Autowired
+ private RedisConnectionFactory redisConnectionFactory;
+
+ /**
+ * 订阅全频道
+ */
+ public static String CHANNEL_ALL = "WEBSOCKET_ALL";
+
+ /**
+ * 订阅用户频道前缀
+ */
+ public static String CHANNEL_USER_PREFIX = "WEBSOCKET_USER_";
+
+ /**
+ * 收到Websocket消息时
+ *
+ * @param session 会话
+ * @param textMessage 文本消息
+ */
+ @Override
+ protected void handleTextMessage(@NonNull WebSocketSession session, @NotNull TextMessage textMessage) {
+ final String message = textMessage.getPayload();
+ if (Configs.getWebsocketConfig().getPing().equals(message)) {
+ try {
+ WebSocketEvent webSocketEvent = new WebSocketEvent<>().setEvent(Configs.getWebsocketConfig().getPong());
+ session.sendMessage(new TextMessage(Json.toString(webSocketEvent)));
+ } catch (IOException e) {
+ log.error("发送Websocket消息失败: {}", e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * 连接就绪后监听队列
+ *
+ * @param session 会话
+ */
+ @Override
+ public void afterConnectionEstablished(@NonNull WebSocketSession session) {
+ if (Objects.isNull(session.getUri())) {
+ return;
+ }
+ String accessToken = session.getUri().getQuery();
+ if (Objects.isNull(accessToken)) {
+ log.warn("没有传入AccessToken 即将关闭连接");
+ closeConnection(session);
+ return;
+ }
+ long userId = Utils.getSecurityUtil().getIdFromAccessToken(accessToken);
+ switch (Configs.getWebsocketConfig().getSupport()) {
+ case REDIS:
+ startRedisListener(session, userId);
+ break;
+ case MQTT:
+ startMqttListener(session, userId);
+ break;
+ default:
+ throw new RuntimeException("WebSocket暂不支持");
+ }
+ }
+
+ /**
+ * 处理监听到的频道消息
+ *
+ * @param message 消息
+ * @param session 连接
+ */
+ private void onChannelMessage(@NotNull String message, @NonNull WebSocketSession session) {
+ try {
+ session.sendMessage(new TextMessage(message));
+ } catch (Exception exception) {
+ log.error("消息发送失败", exception);
+ }
+ }
+
+ /**
+ * 开始监听Redis消息
+ *
+ * @param session 连接
+ * @param userId 用户ID
+ */
+ private void startRedisListener(@NotNull WebSocketSession session, long userId) {
+ final String personalChannel = CHANNEL_USER_PREFIX + userId;
+ redisConnectionFactory.getConnection().subscribe(
+ (message, pattern) -> {
+ synchronized (session) {
+ onChannelMessage(new String(message.getBody(), StandardCharsets.UTF_8), session);
+ }
+ },
+ CHANNEL_ALL.getBytes(StandardCharsets.UTF_8),
+ personalChannel.getBytes(StandardCharsets.UTF_8)
+ );
+ }
+
+ /**
+ * 开始监听MQTT消息
+ *
+ * @param session WebSocket会话
+ * @param userId 用户ID
+ */
+ private void startMqttListener(@NotNull WebSocketSession session, long userId) {
+ try (MqttClient mqttClient = Utils.getMqttUtil().createClient()) {
+ mqttClient.setCallback(new MqttCallback() {
+ @Override
+ public void connectionLost(Throwable throwable) {
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage mqttMessage) {
+ synchronized (session) {
+ onChannelMessage(new String(mqttMessage.getPayload(), StandardCharsets.UTF_8), session);
+ }
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+
+ }
+ });
+ mqttClient.connect(Utils.getMqttUtil().createOption());
+ final String personalChannel = CHANNEL_USER_PREFIX + userId;
+ String[] topics = {CHANNEL_ALL, personalChannel};
+ mqttClient.subscribe(topics);
+ } catch (MqttException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public void afterConnectionClosed(@NotNull WebSocketSession session, @NotNull CloseStatus status) {
+ }
+
+ @Override
+ public void onMessage(@NotNull Message message, byte[] pattern) {
+ }
+
+ /**
+ * 关闭连接
+ *
+ * @param session 会话
+ */
+ private void closeConnection(@NotNull WebSocketSession session) {
+ try {
+ session.close();
+ } catch (IOException e) {
+ log.error("关闭Websocket失败");
+ }
+ }
+}
diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java
new file mode 100644
index 0000000000000000000000000000000000000000..76e5564425482ca72654522a1a6f5be6d91c3358
--- /dev/null
+++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java
@@ -0,0 +1,63 @@
+package cn.hamm.airpower.websocket;
+
+import cn.hamm.airpower.config.Configs;
+import cn.hamm.airpower.model.Json;
+import cn.hamm.airpower.util.Utils;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.jetbrains.annotations.NotNull;
+import org.springframework.stereotype.Component;
+
+/**
+ * WebsocketUtil
+ *
+ * @author Hamm
+ */
+@Slf4j
+@Component
+public class WebsocketUtil {
+ /**
+ * 给所有人发消息
+ *
+ * @param message 消息内容
+ */
+ public final @NotNull WebSocketEvent sendToAll(WebSocketMessage message) {
+ return publish(WebsocketHandler.CHANNEL_ALL, message);
+ }
+
+ /**
+ * 给指定用户发消息
+ *
+ * @param userId 用户ID
+ * @param message 消息内容
+ */
+ public final @NotNull WebSocketEvent sendToUser(long userId, WebSocketMessage message) {
+ return publish(WebsocketHandler.CHANNEL_USER_PREFIX + userId, message);
+ }
+
+ /**
+ * 发布消息
+ *
+ * @param channel 频道
+ * @param message 消息
+ */
+ private @NotNull WebSocketEvent publish(String channel, WebSocketMessage message) {
+ WebSocketEvent webSocketEvent = new WebSocketEvent<>();
+ webSocketEvent.setData(message);
+ switch (Configs.getWebsocketConfig().getSupport()) {
+ case REDIS:
+ Utils.getRedisUtil().publish(channel, Json.toString(webSocketEvent));
+ break;
+ case MQTT:
+ try {
+ Utils.getMqttUtil().publish(WebsocketHandler.CHANNEL_ALL, Json.toString(webSocketEvent));
+ } catch (MqttException e) {
+ throw new RuntimeException("发布消息失败", e);
+ }
+ break;
+ default:
+ throw new RuntimeException("WebSocket暂不支持");
+ }
+ return webSocketEvent;
+ }
+}