diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketConfig.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketConfig.java index e1471678dde307c9d79d73f476196baaea786521..b96bddb068d97d6321cafdc310f776b0d75e2026 100644 --- a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketConfig.java +++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketConfig.java @@ -1,21 +1,27 @@ package cn.hamm.airpower.websocket; +import cn.hamm.airpower.exception.ServiceException; import lombok.Data; import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; import org.springframework.web.socket.handler.TextWebSocketHandler; +import java.util.Objects; + /** *

WebSocket配置

* * @author Hamm */ +@Slf4j @Component @Data @Accessors(chain = true) @@ -42,6 +48,11 @@ public class WebSocketConfig implements WebSocketConfigurer { */ private WebSocketSupport support = WebSocketSupport.NO; + /** + *

发布订阅的频道前缀

+ */ + private String channelPrefix; + @Bean public TextWebSocketHandler getWebSocketHandler() { return new WebsocketHandler(); @@ -54,8 +65,12 @@ public class WebSocketConfig implements WebSocketConfigurer { */ @Override public void registerWebSocketHandlers(@NotNull WebSocketHandlerRegistry registry) { - if (!support.equals(WebSocketSupport.NO)) { - registry.addHandler(getWebSocketHandler(), path).setAllowedOrigins("*"); + if (support.equals(WebSocketSupport.NO)) { + return; + } + if (Objects.isNull(channelPrefix) || !StringUtils.hasText(channelPrefix)) { + throw new ServiceException("没有配置 airpower.websocket.channelPrefix, 无法启动WebSocket服务"); } + registry.addHandler(getWebSocketHandler(), path).setAllowedOrigins("*"); } } diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java index f5942948c3f737b006fee8bb050993ea72df6282..cef57bd442d5aef5db4a1d7e663bcc113c960ef1 100644 --- a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java +++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java @@ -1,6 +1,7 @@ package cn.hamm.airpower.websocket; import cn.hamm.airpower.config.Configs; +import cn.hamm.airpower.config.Constant; import cn.hamm.airpower.exception.ServiceException; import cn.hamm.airpower.model.Json; import cn.hamm.airpower.util.Utils; @@ -115,14 +116,14 @@ public class WebsocketHandler extends TextWebSocketHandler implements MessageLis * @param userId 用户ID */ private void startRedisListener(@NotNull WebSocketSession session, long userId) { - final String personalChannel = CHANNEL_USER_PREFIX + userId; + final String personalChannel = Configs.getWebsocketConfig().getChannelPrefix() + Constant.UNDERLINE + CHANNEL_USER_PREFIX + userId; redisConnectionFactory.getConnection().subscribe( (message, pattern) -> { synchronized (session) { onChannelMessage(new String(message.getBody(), StandardCharsets.UTF_8), session); } }, - CHANNEL_ALL.getBytes(StandardCharsets.UTF_8), + (Configs.getWebsocketConfig().getChannelPrefix() + Constant.UNDERLINE + CHANNEL_ALL).getBytes(StandardCharsets.UTF_8), personalChannel.getBytes(StandardCharsets.UTF_8) ); } diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java index 76e5564425482ca72654522a1a6f5be6d91c3358..d0b652da9c3a7ecdac18f55d8985402f07d13513 100644 --- a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java +++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java @@ -1,12 +1,16 @@ package cn.hamm.airpower.websocket; import cn.hamm.airpower.config.Configs; +import cn.hamm.airpower.exception.ServiceException; import cn.hamm.airpower.model.Json; import cn.hamm.airpower.util.Utils; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttException; import org.jetbrains.annotations.NotNull; import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; + +import java.util.Objects; /** *

WebsocketUtil

@@ -42,6 +46,10 @@ public class WebsocketUtil { * @param message 消息 */ private @NotNull WebSocketEvent publish(String channel, WebSocketMessage message) { + final String channelPrefix = Configs.getWebsocketConfig().getChannelPrefix(); + if (Objects.isNull(channelPrefix) || !StringUtils.hasText(channelPrefix)) { + throw new ServiceException("没有配置 airpower.websocket.channelPrefix, 无法启动WebSocket服务"); + } WebSocketEvent webSocketEvent = new WebSocketEvent<>(); webSocketEvent.setData(message); switch (Configs.getWebsocketConfig().getSupport()) {