diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketConfig.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketConfig.java
index e1471678dde307c9d79d73f476196baaea786521..b96bddb068d97d6321cafdc310f776b0d75e2026 100644
--- a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketConfig.java
+++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebSocketConfig.java
@@ -1,21 +1,27 @@
package cn.hamm.airpower.websocket;
+import cn.hamm.airpower.exception.ServiceException;
import lombok.Data;
import lombok.experimental.Accessors;
+import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.handler.TextWebSocketHandler;
+import java.util.Objects;
+
/**
*
WebSocket配置
*
* @author Hamm
*/
+@Slf4j
@Component
@Data
@Accessors(chain = true)
@@ -42,6 +48,11 @@ public class WebSocketConfig implements WebSocketConfigurer {
*/
private WebSocketSupport support = WebSocketSupport.NO;
+ /**
+ * 发布订阅的频道前缀
+ */
+ private String channelPrefix;
+
@Bean
public TextWebSocketHandler getWebSocketHandler() {
return new WebsocketHandler();
@@ -54,8 +65,12 @@ public class WebSocketConfig implements WebSocketConfigurer {
*/
@Override
public void registerWebSocketHandlers(@NotNull WebSocketHandlerRegistry registry) {
- if (!support.equals(WebSocketSupport.NO)) {
- registry.addHandler(getWebSocketHandler(), path).setAllowedOrigins("*");
+ if (support.equals(WebSocketSupport.NO)) {
+ return;
+ }
+ if (Objects.isNull(channelPrefix) || !StringUtils.hasText(channelPrefix)) {
+ throw new ServiceException("没有配置 airpower.websocket.channelPrefix, 无法启动WebSocket服务");
}
+ registry.addHandler(getWebSocketHandler(), path).setAllowedOrigins("*");
}
}
diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java
index f5942948c3f737b006fee8bb050993ea72df6282..cef57bd442d5aef5db4a1d7e663bcc113c960ef1 100644
--- a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java
+++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketHandler.java
@@ -1,6 +1,7 @@
package cn.hamm.airpower.websocket;
import cn.hamm.airpower.config.Configs;
+import cn.hamm.airpower.config.Constant;
import cn.hamm.airpower.exception.ServiceException;
import cn.hamm.airpower.model.Json;
import cn.hamm.airpower.util.Utils;
@@ -115,14 +116,14 @@ public class WebsocketHandler extends TextWebSocketHandler implements MessageLis
* @param userId 用户ID
*/
private void startRedisListener(@NotNull WebSocketSession session, long userId) {
- final String personalChannel = CHANNEL_USER_PREFIX + userId;
+ final String personalChannel = Configs.getWebsocketConfig().getChannelPrefix() + Constant.UNDERLINE + CHANNEL_USER_PREFIX + userId;
redisConnectionFactory.getConnection().subscribe(
(message, pattern) -> {
synchronized (session) {
onChannelMessage(new String(message.getBody(), StandardCharsets.UTF_8), session);
}
},
- CHANNEL_ALL.getBytes(StandardCharsets.UTF_8),
+ (Configs.getWebsocketConfig().getChannelPrefix() + Constant.UNDERLINE + CHANNEL_ALL).getBytes(StandardCharsets.UTF_8),
personalChannel.getBytes(StandardCharsets.UTF_8)
);
}
diff --git a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java
index 76e5564425482ca72654522a1a6f5be6d91c3358..d0b652da9c3a7ecdac18f55d8985402f07d13513 100644
--- a/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java
+++ b/airpower-core/src/main/java/cn/hamm/airpower/websocket/WebsocketUtil.java
@@ -1,12 +1,16 @@
package cn.hamm.airpower.websocket;
import cn.hamm.airpower.config.Configs;
+import cn.hamm.airpower.exception.ServiceException;
import cn.hamm.airpower.model.Json;
import cn.hamm.airpower.util.Utils;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.jetbrains.annotations.NotNull;
import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+
+import java.util.Objects;
/**
* WebsocketUtil
@@ -42,6 +46,10 @@ public class WebsocketUtil {
* @param message 消息
*/
private @NotNull WebSocketEvent publish(String channel, WebSocketMessage message) {
+ final String channelPrefix = Configs.getWebsocketConfig().getChannelPrefix();
+ if (Objects.isNull(channelPrefix) || !StringUtils.hasText(channelPrefix)) {
+ throw new ServiceException("没有配置 airpower.websocket.channelPrefix, 无法启动WebSocket服务");
+ }
WebSocketEvent webSocketEvent = new WebSocketEvent<>();
webSocketEvent.setData(message);
switch (Configs.getWebsocketConfig().getSupport()) {