From 2ce622ab7865d0cecccd0e8beb98f3e0a575c18f Mon Sep 17 00:00:00 2001 From: Hamm Date: Mon, 27 May 2024 15:20:51 +0800 Subject: [PATCH 1/2] =?UTF-8?q?fix(Bug):=20=E4=BC=98=E5=8C=96=E4=BA=86?= =?UTF-8?q?=E4=B8=80=E4=BA=9B=E5=B7=B2=E7=9F=A5=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/cn/hamm/airpower/config/Configs.java | 12 +- .../hamm/airpower/config/ServiceConfig.java | 5 + .../java/cn/hamm/airpower/util/Utils.java | 11 +- .../airpower/websocket/WebSocketConfig.java | 61 ++++++ .../airpower/websocket/WebSocketEvent.java | 49 +++++ .../airpower/websocket/WebSocketMessage.java | 23 +++ .../airpower/websocket/WebSocketSupport.java | 23 +++ .../airpower/websocket/WebsocketHandler.java | 180 ++++++++++++++++++ .../airpower/websocket/WebsocketUtil.java | 62 ++++++ 9 files changed, 424 insertions(+), 2 deletions(-) create mode 100644 airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketConfig.java create mode 100644 airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketEvent.java create mode 100644 airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketMessage.java create mode 100644 airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketSupport.java create mode 100644 airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java create mode 100644 airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java diff --git a/airpower-core/src/main/java/cn/hamm/airpower/config/Configs.java b/airpower-core/src/main/java/cn/hamm/airpower/config/Configs.java index 0818bae..1afd760 100644 --- a/airpower-core/src/main/java/cn/hamm/airpower/config/Configs.java +++ b/airpower-core/src/main/java/cn/hamm/airpower/config/Configs.java @@ -1,5 +1,6 @@ package cn.hamm.airpower.config; +import cn.hamm.airpower.websocket.WebSocketConfig; import lombok.Getter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -29,14 +30,23 @@ public class Configs { @Getter private static MqttConfig mqttConfig; + + /** + *

WebSocket配置

+ */ + @Getter + private static WebSocketConfig websocketConfig; + @Autowired Configs( CookieConfig cookieConfig, ServiceConfig serviceConfig, - MqttConfig mqttConfig + MqttConfig mqttConfig, + WebSocketConfig websocketConfig ) { Configs.cookieConfig = cookieConfig; Configs.serviceConfig = serviceConfig; Configs.mqttConfig = mqttConfig; + Configs.websocketConfig = websocketConfig; } } diff --git a/airpower-core/src/main/java/cn/hamm/airpower/config/ServiceConfig.java b/airpower-core/src/main/java/cn/hamm/airpower/config/ServiceConfig.java index b44393c..802e9c7 100644 --- a/airpower-core/src/main/java/cn/hamm/airpower/config/ServiceConfig.java +++ b/airpower-core/src/main/java/cn/hamm/airpower/config/ServiceConfig.java @@ -23,6 +23,11 @@ public class ServiceConfig { */ private String databasePrefix = "tenant_"; + /** + *

默认服务ID

+ */ + private int serviceId = Constant.ZERO_INT; + /** *

是否开启文档

*/ diff --git a/airpower-core/src/main/java/cn/hamm/airpower/util/Utils.java b/airpower-core/src/main/java/cn/hamm/airpower/util/Utils.java index 5735aed..ad256a9 100644 --- a/airpower-core/src/main/java/cn/hamm/airpower/util/Utils.java +++ b/airpower-core/src/main/java/cn/hamm/airpower/util/Utils.java @@ -1,5 +1,6 @@ package cn.hamm.airpower.util; +import cn.hamm.airpower.websocket.WebsocketUtil; import jakarta.persistence.EntityManager; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; @@ -138,6 +139,12 @@ public class Utils { @Getter private static HttpServletResponse response; + /** + *

WebSocket工具类

+ */ + @Getter + private static WebsocketUtil websocketUtil; + /** *

数字工具

*/ @@ -193,7 +200,8 @@ public class Utils { HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, NumberUtil numberUtil, - StringUtil stringUtil + StringUtil stringUtil, + WebsocketUtil websocketUtil ) { Utils.redisUtil = redisUtil; Utils.emailUtil = emailUtil; @@ -217,6 +225,7 @@ public class Utils { Utils.response = httpServletResponse; Utils.numberUtil = numberUtil; Utils.stringUtil = stringUtil; + Utils.websocketUtil = websocketUtil; } /** diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketConfig.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketConfig.java new file mode 100644 index 0000000..e147167 --- /dev/null +++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketConfig.java @@ -0,0 +1,61 @@ +package cn.hamm.airpower.websocket; + +import lombok.Data; +import lombok.experimental.Accessors; +import org.jetbrains.annotations.NotNull; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.config.annotation.WebSocketConfigurer; +import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +/** + *

WebSocket配置

+ * + * @author Hamm + */ +@Component +@Data +@Accessors(chain = true) +@Configuration +@ConfigurationProperties("airpower.websocket") +public class WebSocketConfig implements WebSocketConfigurer { + /** + *

PING

+ */ + private String ping = "ping"; + + /** + *

PONG

+ */ + private String pong = "pong"; + + /** + *

Websocket 路径

+ */ + private String path = "/websocket"; + + /** + *

WebSocket支持方式

+ */ + private WebSocketSupport support = WebSocketSupport.NO; + + @Bean + public TextWebSocketHandler getWebSocketHandler() { + return new WebsocketHandler(); + } + + /** + *

添加WebSocket服务监听

+ * + * @param registry WebSocketHandlerRegistry + */ + @Override + public void registerWebSocketHandlers(@NotNull WebSocketHandlerRegistry registry) { + if (!support.equals(WebSocketSupport.NO)) { + registry.addHandler(getWebSocketHandler(), path).setAllowedOrigins("*"); + } + } +} diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketEvent.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketEvent.java new file mode 100644 index 0000000..c294806 --- /dev/null +++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketEvent.java @@ -0,0 +1,49 @@ +package cn.hamm.airpower.websocket; + +import cn.hamm.airpower.config.Configs; +import cn.hamm.airpower.config.Constant; +import lombok.Data; +import lombok.experimental.Accessors; + +import java.util.concurrent.atomic.AtomicLong; + +/** + *

WebSocket事件

+ * + * @author Hamm.cn + */ +@Data +@Accessors(chain = true) +public class WebSocketEvent { + /** + *

当前事件ID

+ */ + private static final AtomicLong CURRENT_EVENT_ID = new AtomicLong(Constant.ZERO_LONG); + + /** + *

事件ID

+ */ + private String eventId; + + /** + *

事件名称

+ */ + private String event = "message"; + + /** + *

消息产生时间

+ */ + private Long time; + + /** + *

消息对象

+ */ + private T data; + + WebSocketEvent() { + long time = System.currentTimeMillis(); + // time_serviceId_count + this.eventId = time + Constant.UNDERLINE + Configs.getServiceConfig().getServiceId() + Constant.UNDERLINE + CURRENT_EVENT_ID.getAndDecrement(); + this.time = time; + } +} \ No newline at end of file diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketMessage.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketMessage.java new file mode 100644 index 0000000..4544a23 --- /dev/null +++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketMessage.java @@ -0,0 +1,23 @@ +package cn.hamm.airpower.websocket; + +import lombok.Data; +import lombok.experimental.Accessors; + +/** + *

WebSocket消息基类

+ * + * @author Hamm.cn + */ +@Data +@Accessors(chain = true) +public class WebSocketMessage { + /** + *

消息类型

+ */ + private String type; + + /** + *

消息内容

+ */ + private Object payload; +} \ No newline at end of file diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketSupport.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketSupport.java new file mode 100644 index 0000000..1f0a905 --- /dev/null +++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketSupport.java @@ -0,0 +1,23 @@ +package cn.hamm.airpower.websocket; + +/** + *

WebSocket支持

+ * + * @author Hamm.cn + */ +public enum WebSocketSupport { + /** + *

Redis

+ */ + REDIS, + + /** + *

MQTT

+ */ + MQTT, + + /** + *

不支持

+ */ + NO, +} diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java new file mode 100644 index 0000000..8181650 --- /dev/null +++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java @@ -0,0 +1,180 @@ +package cn.hamm.airpower.websocket; + +import cn.hamm.airpower.config.Configs; +import cn.hamm.airpower.exception.ServiceException; +import cn.hamm.airpower.model.Json; +import cn.hamm.airpower.util.Utils; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.jetbrains.annotations.NotNull; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.lang.NonNull; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Objects; + +/** + *

WebSocket Handler

+ * + * @author Hamm + */ +@Component +@Slf4j +public class WebsocketHandler extends TextWebSocketHandler implements MessageListener { + /** + *

Redis连接工厂

+ */ + @Autowired + private RedisConnectionFactory redisConnectionFactory; + + /** + *

订阅全频道

+ */ + public static String CHANNEL_ALL = "WEBSOCKET_ALL"; + + /** + *

订阅用户频道前缀

+ */ + public static String CHANNEL_USER_PREFIX = "WEBSOCKET_USER_"; + + /** + *

收到Websocket消息时

+ * + * @param session 会话 + * @param textMessage 文本消息 + */ + @Override + protected void handleTextMessage(@NonNull WebSocketSession session, @NotNull TextMessage textMessage) { + final String message = textMessage.getPayload(); + if (Configs.getWebsocketConfig().getPing().equals(message)) { + try { + WebSocketEvent webSocketEvent = new WebSocketEvent<>().setEvent(Configs.getWebsocketConfig().getPong()); + session.sendMessage(new TextMessage(Json.toString(webSocketEvent))); + } catch (IOException e) { + log.error("发送Websocket消息失败: {}", e.getMessage()); + } + } + } + + /** + *

连接就绪后监听队列

+ * + * @param session 会话 + */ + @Override + public void afterConnectionEstablished(@NonNull WebSocketSession session) { + if (Objects.isNull(session.getUri())) { + return; + } + String accessToken = session.getUri().getQuery(); + if (Objects.isNull(accessToken)) { + log.warn("没有传入AccessToken 即将关闭连接"); + closeConnection(session); + return; + } + long userId = Utils.getSecurityUtil().getIdFromAccessToken(accessToken); + switch (Configs.getWebsocketConfig().getSupport()) { + case REDIS: + startRedisListener(session, userId); + break; + case MQTT: + startMqttListener(session, userId); + break; + default: + throw new RuntimeException("WebSocket暂不支持"); + } + } + + /** + *

处理监听到的频道消息

+ * + * @param message 消息 + * @param session 连接 + */ + private void onChannelMessage(@NotNull String message, @NonNull WebSocketSession session) { + try { + session.sendMessage(new TextMessage(message)); + } catch (Exception exception) { + log.error("消息发送失败", exception); + } + } + + /** + *

开始监听Redis消息

+ * + * @param session 连接 + * @param userId 用户ID + */ + private void startRedisListener(@NotNull WebSocketSession session, long userId) { + final String personalChannel = CHANNEL_USER_PREFIX + userId; + redisConnectionFactory.getConnection().subscribe( + (message, pattern) -> onChannelMessage(new String(message.getBody(), StandardCharsets.UTF_8), session), + CHANNEL_ALL.getBytes(StandardCharsets.UTF_8), + personalChannel.getBytes(StandardCharsets.UTF_8) + ); + } + + /** + *

开始监听MQTT消息

+ * + * @param session WebSocket会话 + * @param userId 用户ID + */ + private void startMqttListener(@NotNull WebSocketSession session, long userId) { + try (MqttClient mqttClient = Utils.getMqttUtil().createClient()) { + mqttClient.setCallback(new MqttCallback() { + @Override + public void connectionLost(Throwable throwable) { + } + + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) { + synchronized (session) { + onChannelMessage(new String(mqttMessage.getPayload(), StandardCharsets.UTF_8), session); + } + } + + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + + } + }); + mqttClient.connect(Utils.getMqttUtil().createOption()); + final String personalChannel = CHANNEL_USER_PREFIX + userId; + String[] topics = {CHANNEL_ALL, personalChannel}; + mqttClient.subscribe(topics); + } catch (MqttException e) { + throw new ServiceException(e); + } + } + + @Override + public void afterConnectionClosed(@NotNull WebSocketSession session, @NotNull CloseStatus status) { + } + + @Override + public void onMessage(@NotNull Message message, byte[] pattern) { + } + + /** + *

关闭连接

+ * + * @param session 会话 + */ + private void closeConnection(@NotNull WebSocketSession session) { + try { + session.close(); + } catch (IOException e) { + log.error("关闭Websocket失败"); + } + } +} diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java new file mode 100644 index 0000000..0e4687e --- /dev/null +++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java @@ -0,0 +1,62 @@ +package cn.hamm.airpower.websocket; + +import cn.hamm.airpower.config.Configs; +import cn.hamm.airpower.model.Json; +import cn.hamm.airpower.util.Utils; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.springframework.stereotype.Component; + +/** + *

WebsocketUtil

+ * + * @author Hamm + */ +@Slf4j +@Component +public class WebsocketUtil { + /** + *

给所有人发消息

+ * + * @param message 消息内容 + */ + public final String sendToAll(WebSocketMessage message) { + return publish(WebsocketHandler.CHANNEL_ALL, message); + } + + /** + *

给指定用户发消息

+ * + * @param userId 用户ID + * @param message 消息内容 + */ + public final String sendToUser(long userId, WebSocketMessage message) { + return publish(WebsocketHandler.CHANNEL_USER_PREFIX + userId, message); + } + + /** + *

发布消息

+ * + * @param channel 频道 + * @param message 消息 + */ + private String publish(String channel, WebSocketMessage message) { + WebSocketEvent webSocketEvent = new WebSocketEvent<>(); + webSocketEvent.setData(message); + switch (Configs.getWebsocketConfig().getSupport()) { + case REDIS: + Utils.getRedisUtil().publish(channel, Json.toString(webSocketEvent)); + break; + case MQTT: + try { + Utils.getMqttUtil().publish(WebsocketHandler.CHANNEL_ALL, Json.toString(webSocketEvent)); + } catch (MqttException e) { + throw new RuntimeException("发布消息失败", e); + } + break; + default: + throw new RuntimeException("WebSocket暂不支持"); + } + return webSocketEvent.getEventId(); + } +} -- Gitee From 7a8a290bb0acd9fdc31629352658d0bc69e9d92b Mon Sep 17 00:00:00 2001 From: Hamm Date: Mon, 27 May 2024 15:49:30 +0800 Subject: [PATCH 2/2] =?UTF-8?q?fix(Bug):=20=E4=BC=98=E5=8C=96=E4=BA=86?= =?UTF-8?q?=E4=B8=80=E4=BA=9B=E5=B7=B2=E7=9F=A5=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cn/hamm/airpower/websocket/WebSocketEvent.java | 12 +++++++++--- .../cn/hamm/airpower/websocket/WebsocketHandler.java | 6 +++++- .../cn/hamm/airpower/websocket/WebsocketUtil.java | 9 +++++---- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketEvent.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketEvent.java index c294806..c5258bb 100644 --- a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketEvent.java +++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketEvent.java @@ -5,6 +5,8 @@ import cn.hamm.airpower.config.Constant; import lombok.Data; import lombok.experimental.Accessors; +import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.concurrent.atomic.AtomicLong; /** @@ -23,7 +25,7 @@ public class WebSocketEvent { /** *

事件ID

*/ - private String eventId; + private String id; /** *

事件名称

@@ -42,8 +44,12 @@ public class WebSocketEvent { WebSocketEvent() { long time = System.currentTimeMillis(); - // time_serviceId_count - this.eventId = time + Constant.UNDERLINE + Configs.getServiceConfig().getServiceId() + Constant.UNDERLINE + CURRENT_EVENT_ID.getAndDecrement(); + this.id = Base64.getEncoder().encodeToString((String.format( + "%s-%s-%s", + Configs.getServiceConfig().getServiceId(), + CURRENT_EVENT_ID.incrementAndGet(), + time + )).getBytes(StandardCharsets.UTF_8)); this.time = time; } } \ No newline at end of file diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java index 8181650..f594294 100644 --- a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java +++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java @@ -117,7 +117,11 @@ public class WebsocketHandler extends TextWebSocketHandler implements MessageLis private void startRedisListener(@NotNull WebSocketSession session, long userId) { final String personalChannel = CHANNEL_USER_PREFIX + userId; redisConnectionFactory.getConnection().subscribe( - (message, pattern) -> onChannelMessage(new String(message.getBody(), StandardCharsets.UTF_8), session), + (message, pattern) -> { + synchronized (session) { + onChannelMessage(new String(message.getBody(), StandardCharsets.UTF_8), session); + } + }, CHANNEL_ALL.getBytes(StandardCharsets.UTF_8), personalChannel.getBytes(StandardCharsets.UTF_8) ); diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java index 0e4687e..76e5564 100644 --- a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java +++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java @@ -5,6 +5,7 @@ import cn.hamm.airpower.model.Json; import cn.hamm.airpower.util.Utils; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttException; +import org.jetbrains.annotations.NotNull; import org.springframework.stereotype.Component; /** @@ -20,7 +21,7 @@ public class WebsocketUtil { * * @param message 消息内容 */ - public final String sendToAll(WebSocketMessage message) { + public final @NotNull WebSocketEvent sendToAll(WebSocketMessage message) { return publish(WebsocketHandler.CHANNEL_ALL, message); } @@ -30,7 +31,7 @@ public class WebsocketUtil { * @param userId 用户ID * @param message 消息内容 */ - public final String sendToUser(long userId, WebSocketMessage message) { + public final @NotNull WebSocketEvent sendToUser(long userId, WebSocketMessage message) { return publish(WebsocketHandler.CHANNEL_USER_PREFIX + userId, message); } @@ -40,7 +41,7 @@ public class WebsocketUtil { * @param channel 频道 * @param message 消息 */ - private String publish(String channel, WebSocketMessage message) { + private @NotNull WebSocketEvent publish(String channel, WebSocketMessage message) { WebSocketEvent webSocketEvent = new WebSocketEvent<>(); webSocketEvent.setData(message); switch (Configs.getWebsocketConfig().getSupport()) { @@ -57,6 +58,6 @@ public class WebsocketUtil { default: throw new RuntimeException("WebSocket暂不支持"); } - return webSocketEvent.getEventId(); + return webSocketEvent; } } -- Gitee