diff --git a/airpower-core/src/main/java/cn/hamm/airpower/config/Configs.java b/airpower-core/src/main/java/cn/hamm/airpower/config/Configs.java index 0818bae574a54b659f69378ba105ecee3e0a88e0..1afd760c6dec09537f4091086e29d4a5713c2c9b 100644 --- a/airpower-core/src/main/java/cn/hamm/airpower/config/Configs.java +++ b/airpower-core/src/main/java/cn/hamm/airpower/config/Configs.java @@ -1,5 +1,6 @@ package cn.hamm.airpower.config; +import cn.hamm.airpower.websocket.WebSocketConfig; import lombok.Getter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -29,14 +30,23 @@ public class Configs { @Getter private static MqttConfig mqttConfig; + + /** + *

WebSocket配置

+ */ + @Getter + private static WebSocketConfig websocketConfig; + @Autowired Configs( CookieConfig cookieConfig, ServiceConfig serviceConfig, - MqttConfig mqttConfig + MqttConfig mqttConfig, + WebSocketConfig websocketConfig ) { Configs.cookieConfig = cookieConfig; Configs.serviceConfig = serviceConfig; Configs.mqttConfig = mqttConfig; + Configs.websocketConfig = websocketConfig; } } diff --git a/airpower-core/src/main/java/cn/hamm/airpower/config/ServiceConfig.java b/airpower-core/src/main/java/cn/hamm/airpower/config/ServiceConfig.java index b44393cad668a3147aa97f51488a24cb27931217..802e9c711ebdb88bc8d01a1681ba4f9c0dab25bc 100644 --- a/airpower-core/src/main/java/cn/hamm/airpower/config/ServiceConfig.java +++ b/airpower-core/src/main/java/cn/hamm/airpower/config/ServiceConfig.java @@ -23,6 +23,11 @@ public class ServiceConfig { */ private String databasePrefix = "tenant_"; + /** + *

默认服务ID

+ */ + private int serviceId = Constant.ZERO_INT; + /** *

是否开启文档

*/ diff --git a/airpower-core/src/main/java/cn/hamm/airpower/util/Utils.java b/airpower-core/src/main/java/cn/hamm/airpower/util/Utils.java index 5735aedad7a7df07ac12b716937af88e7a4ef5cd..ad256a9d8b5781f973b95f4ba3758b8b81a3d903 100644 --- a/airpower-core/src/main/java/cn/hamm/airpower/util/Utils.java +++ b/airpower-core/src/main/java/cn/hamm/airpower/util/Utils.java @@ -1,5 +1,6 @@ package cn.hamm.airpower.util; +import cn.hamm.airpower.websocket.WebsocketUtil; import jakarta.persistence.EntityManager; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; @@ -138,6 +139,12 @@ public class Utils { @Getter private static HttpServletResponse response; + /** + *

WebSocket工具类

+ */ + @Getter + private static WebsocketUtil websocketUtil; + /** *

数字工具

*/ @@ -193,7 +200,8 @@ public class Utils { HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, NumberUtil numberUtil, - StringUtil stringUtil + StringUtil stringUtil, + WebsocketUtil websocketUtil ) { Utils.redisUtil = redisUtil; Utils.emailUtil = emailUtil; @@ -217,6 +225,7 @@ public class Utils { Utils.response = httpServletResponse; Utils.numberUtil = numberUtil; Utils.stringUtil = stringUtil; + Utils.websocketUtil = websocketUtil; } /** diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketConfig.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..e1471678dde307c9d79d73f476196baaea786521 --- /dev/null +++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketConfig.java @@ -0,0 +1,61 @@ +package cn.hamm.airpower.websocket; + +import lombok.Data; +import lombok.experimental.Accessors; +import org.jetbrains.annotations.NotNull; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.config.annotation.WebSocketConfigurer; +import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +/** + *

WebSocket配置

+ * + * @author Hamm + */ +@Component +@Data +@Accessors(chain = true) +@Configuration +@ConfigurationProperties("airpower.websocket") +public class WebSocketConfig implements WebSocketConfigurer { + /** + *

PING

+ */ + private String ping = "ping"; + + /** + *

PONG

+ */ + private String pong = "pong"; + + /** + *

Websocket 路径

+ */ + private String path = "/websocket"; + + /** + *

WebSocket支持方式

+ */ + private WebSocketSupport support = WebSocketSupport.NO; + + @Bean + public TextWebSocketHandler getWebSocketHandler() { + return new WebsocketHandler(); + } + + /** + *

添加WebSocket服务监听

+ * + * @param registry WebSocketHandlerRegistry + */ + @Override + public void registerWebSocketHandlers(@NotNull WebSocketHandlerRegistry registry) { + if (!support.equals(WebSocketSupport.NO)) { + registry.addHandler(getWebSocketHandler(), path).setAllowedOrigins("*"); + } + } +} diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketEvent.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..c5258bbd237877f3712b0af97015b205886c4d82 --- /dev/null +++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketEvent.java @@ -0,0 +1,55 @@ +package cn.hamm.airpower.websocket; + +import cn.hamm.airpower.config.Configs; +import cn.hamm.airpower.config.Constant; +import lombok.Data; +import lombok.experimental.Accessors; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.concurrent.atomic.AtomicLong; + +/** + *

WebSocket事件

+ * + * @author Hamm.cn + */ +@Data +@Accessors(chain = true) +public class WebSocketEvent { + /** + *

当前事件ID

+ */ + private static final AtomicLong CURRENT_EVENT_ID = new AtomicLong(Constant.ZERO_LONG); + + /** + *

事件ID

+ */ + private String id; + + /** + *

事件名称

+ */ + private String event = "message"; + + /** + *

消息产生时间

+ */ + private Long time; + + /** + *

消息对象

+ */ + private T data; + + WebSocketEvent() { + long time = System.currentTimeMillis(); + this.id = Base64.getEncoder().encodeToString((String.format( + "%s-%s-%s", + Configs.getServiceConfig().getServiceId(), + CURRENT_EVENT_ID.incrementAndGet(), + time + )).getBytes(StandardCharsets.UTF_8)); + this.time = time; + } +} \ No newline at end of file diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketMessage.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..4544a2357e25353f5b69f7bb6ebed30c76198676 --- /dev/null +++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketMessage.java @@ -0,0 +1,23 @@ +package cn.hamm.airpower.websocket; + +import lombok.Data; +import lombok.experimental.Accessors; + +/** + *

WebSocket消息基类

+ * + * @author Hamm.cn + */ +@Data +@Accessors(chain = true) +public class WebSocketMessage { + /** + *

消息类型

+ */ + private String type; + + /** + *

消息内容

+ */ + private Object payload; +} \ No newline at end of file diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketSupport.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketSupport.java new file mode 100644 index 0000000000000000000000000000000000000000..1f0a905d10e7edcfce1fc83eb3a09b8a33bbe3e8 --- /dev/null +++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketSupport.java @@ -0,0 +1,23 @@ +package cn.hamm.airpower.websocket; + +/** + *

WebSocket支持

+ * + * @author Hamm.cn + */ +public enum WebSocketSupport { + /** + *

Redis

+ */ + REDIS, + + /** + *

MQTT

+ */ + MQTT, + + /** + *

不支持

+ */ + NO, +} diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..f5942948c3f737b006fee8bb050993ea72df6282 --- /dev/null +++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java @@ -0,0 +1,184 @@ +package cn.hamm.airpower.websocket; + +import cn.hamm.airpower.config.Configs; +import cn.hamm.airpower.exception.ServiceException; +import cn.hamm.airpower.model.Json; +import cn.hamm.airpower.util.Utils; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.jetbrains.annotations.NotNull; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.lang.NonNull; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Objects; + +/** + *

WebSocket Handler

+ * + * @author Hamm + */ +@Component +@Slf4j +public class WebsocketHandler extends TextWebSocketHandler implements MessageListener { + /** + *

Redis连接工厂

+ */ + @Autowired + private RedisConnectionFactory redisConnectionFactory; + + /** + *

订阅全频道

+ */ + public static String CHANNEL_ALL = "WEBSOCKET_ALL"; + + /** + *

订阅用户频道前缀

+ */ + public static String CHANNEL_USER_PREFIX = "WEBSOCKET_USER_"; + + /** + *

收到Websocket消息时

+ * + * @param session 会话 + * @param textMessage 文本消息 + */ + @Override + protected void handleTextMessage(@NonNull WebSocketSession session, @NotNull TextMessage textMessage) { + final String message = textMessage.getPayload(); + if (Configs.getWebsocketConfig().getPing().equals(message)) { + try { + WebSocketEvent webSocketEvent = new WebSocketEvent<>().setEvent(Configs.getWebsocketConfig().getPong()); + session.sendMessage(new TextMessage(Json.toString(webSocketEvent))); + } catch (IOException e) { + log.error("发送Websocket消息失败: {}", e.getMessage()); + } + } + } + + /** + *

连接就绪后监听队列

+ * + * @param session 会话 + */ + @Override + public void afterConnectionEstablished(@NonNull WebSocketSession session) { + if (Objects.isNull(session.getUri())) { + return; + } + String accessToken = session.getUri().getQuery(); + if (Objects.isNull(accessToken)) { + log.warn("没有传入AccessToken 即将关闭连接"); + closeConnection(session); + return; + } + long userId = Utils.getSecurityUtil().getIdFromAccessToken(accessToken); + switch (Configs.getWebsocketConfig().getSupport()) { + case REDIS: + startRedisListener(session, userId); + break; + case MQTT: + startMqttListener(session, userId); + break; + default: + throw new RuntimeException("WebSocket暂不支持"); + } + } + + /** + *

处理监听到的频道消息

+ * + * @param message 消息 + * @param session 连接 + */ + private void onChannelMessage(@NotNull String message, @NonNull WebSocketSession session) { + try { + session.sendMessage(new TextMessage(message)); + } catch (Exception exception) { + log.error("消息发送失败", exception); + } + } + + /** + *

开始监听Redis消息

+ * + * @param session 连接 + * @param userId 用户ID + */ + private void startRedisListener(@NotNull WebSocketSession session, long userId) { + final String personalChannel = CHANNEL_USER_PREFIX + userId; + redisConnectionFactory.getConnection().subscribe( + (message, pattern) -> { + synchronized (session) { + onChannelMessage(new String(message.getBody(), StandardCharsets.UTF_8), session); + } + }, + CHANNEL_ALL.getBytes(StandardCharsets.UTF_8), + personalChannel.getBytes(StandardCharsets.UTF_8) + ); + } + + /** + *

开始监听MQTT消息

+ * + * @param session WebSocket会话 + * @param userId 用户ID + */ + private void startMqttListener(@NotNull WebSocketSession session, long userId) { + try (MqttClient mqttClient = Utils.getMqttUtil().createClient()) { + mqttClient.setCallback(new MqttCallback() { + @Override + public void connectionLost(Throwable throwable) { + } + + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) { + synchronized (session) { + onChannelMessage(new String(mqttMessage.getPayload(), StandardCharsets.UTF_8), session); + } + } + + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + + } + }); + mqttClient.connect(Utils.getMqttUtil().createOption()); + final String personalChannel = CHANNEL_USER_PREFIX + userId; + String[] topics = {CHANNEL_ALL, personalChannel}; + mqttClient.subscribe(topics); + } catch (MqttException e) { + throw new ServiceException(e); + } + } + + @Override + public void afterConnectionClosed(@NotNull WebSocketSession session, @NotNull CloseStatus status) { + } + + @Override + public void onMessage(@NotNull Message message, byte[] pattern) { + } + + /** + *

关闭连接

+ * + * @param session 会话 + */ + private void closeConnection(@NotNull WebSocketSession session) { + try { + session.close(); + } catch (IOException e) { + log.error("关闭Websocket失败"); + } + } +} diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..76e5564425482ca72654522a1a6f5be6d91c3358 --- /dev/null +++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java @@ -0,0 +1,63 @@ +package cn.hamm.airpower.websocket; + +import cn.hamm.airpower.config.Configs; +import cn.hamm.airpower.model.Json; +import cn.hamm.airpower.util.Utils; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.jetbrains.annotations.NotNull; +import org.springframework.stereotype.Component; + +/** + *

WebsocketUtil

+ * + * @author Hamm + */ +@Slf4j +@Component +public class WebsocketUtil { + /** + *

给所有人发消息

+ * + * @param message 消息内容 + */ + public final @NotNull WebSocketEvent sendToAll(WebSocketMessage message) { + return publish(WebsocketHandler.CHANNEL_ALL, message); + } + + /** + *

给指定用户发消息

+ * + * @param userId 用户ID + * @param message 消息内容 + */ + public final @NotNull WebSocketEvent sendToUser(long userId, WebSocketMessage message) { + return publish(WebsocketHandler.CHANNEL_USER_PREFIX + userId, message); + } + + /** + *

发布消息

+ * + * @param channel 频道 + * @param message 消息 + */ + private @NotNull WebSocketEvent publish(String channel, WebSocketMessage message) { + WebSocketEvent webSocketEvent = new WebSocketEvent<>(); + webSocketEvent.setData(message); + switch (Configs.getWebsocketConfig().getSupport()) { + case REDIS: + Utils.getRedisUtil().publish(channel, Json.toString(webSocketEvent)); + break; + case MQTT: + try { + Utils.getMqttUtil().publish(WebsocketHandler.CHANNEL_ALL, Json.toString(webSocketEvent)); + } catch (MqttException e) { + throw new RuntimeException("发布消息失败", e); + } + break; + default: + throw new RuntimeException("WebSocket暂不支持"); + } + return webSocketEvent; + } +}