diff --git a/smqtt-bootstrap/pom.xml b/smqtt-bootstrap/pom.xml
index 1f67f6d19a35971acb604dc67d441429e65b3469..dddb998cd53713e5caade0066e83ac9340ecdc64 100644
--- a/smqtt-bootstrap/pom.xml
+++ b/smqtt-bootstrap/pom.xml
@@ -57,6 +57,21 @@
io.github.quickmsg
1.1.2
+
+ org.springframework.boot
+ spring-boot
+ 2.2.1.RELEASE
+
+
+ org.springframework.boot
+ spring-boot-autoconfigure
+ 2.2.1.RELEASE
+
+
+ io.github.quickmsg
+ smqtt-spring-boot-starter
+ 1.1.2
+
diff --git a/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java b/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java
new file mode 100644
index 0000000000000000000000000000000000000000..1a107e5f65d6c590f712a6bf779c08635094a460
--- /dev/null
+++ b/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java
@@ -0,0 +1,22 @@
+package io.github.quickmsg.springboot;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.ComponentScan;
+
+import io.github.quickmsg.starter.EnableMqttServer;
+
+/**
+ * @Author: Jingwu.Zhou
+ * @Date: 2022/1/24
+ */
+@SpringBootApplication
+@EnableConfigurationProperties
+@ComponentScan(basePackages = {"io.github.quickmsg"})
+@EnableMqttServer
+public class SpringBootStarter {
+ public static void main(String[] args) {
+ SpringApplication.run(SpringBootStarter.class, args);
+ }
+}
diff --git a/smqtt-bootstrap/src/main/resources/application.yml b/smqtt-bootstrap/src/main/resources/application.yml
new file mode 100644
index 0000000000000000000000000000000000000000..acc60e3252ac8fc147edb7d0fb5783dbc71b2244
--- /dev/null
+++ b/smqtt-bootstrap/src/main/resources/application.yml
@@ -0,0 +1,61 @@
+ smqtt:
+ logLevel: INFO # 系统日志
+ tcp: # tcp配置
+ port: 1883 # mqtt端口号
+ username: APPID # mqtt连接默认用户名 生产环境建议spi去注入PasswordAuthentication接口
+ password: 123456789 # mqtt连接默认密码 生产环境建议spi去注入PasswordAuthentication接口
+ wiretap: true # 二进制日志 前提是 smqtt.logLevel = DEBUG
+ bossThreadSize: 4 # boss线程
+ workThreadSize: 8 # work线程
+ lowWaterMark: 4000000 # 不建议配置 默认 32768
+ highWaterMark: 80000000 # 不建议配置 默认 65536
+ businessThreadSize: 16 # 业务线程数 默认=cpu核心数*10
+ businessQueueSize: 100000 #业务队列 默认=100000
+ # globalReadWriteSize: 10000000,100000000 全局读写大小限制
+ # channelReadWriteSize: 10000000,100000000 单个channel读写大小限制
+ ssl: # ssl配置
+ enable: false # 开关
+ key: /user/server.key # 指定ssl文件 默认系统生成
+ crt: /user/server.crt # 指定ssl文件 默认系统生成
+ http: # http相关配置 端口固定60000
+ enable: false # 开关
+ accessLog: true # http访问日志
+ ssl: # ssl配置
+ enable: false
+ admin: # 后台管理配置
+ enable: true # 开关
+ username: smqtt # 访问用户名
+ password: smqtt # 访问密码
+ ws: # websocket配置
+ enable: false # 开关
+ port: 8999 # 端口
+ path: /mqtt # ws 的访问path mqtt.js请设置此选项
+ cluster: # 集群配置
+ enable: false # 集群开关
+ url: 127.0.0.1:7771,127.0.0.1:7772 # 启动节点
+ port: 7771 # 端口
+ node: node-1 # 集群节点名称 唯一
+ external:
+ host: localhost # 用于映射容器ip 请不要随意设置,如果不需要请移除此选项
+ port: 7777 # 用于映射容器端口 请不要随意设置,如果不需要请移除此选项
+ redis: # redis 请参考 https://doc.smqtt.cc/%E5%85%B6%E4%BB%96/1.store.html 【如果没有引入相关依赖请移除此配置】
+ mode: sentinel
+ database: 7
+ password: xuanwu-T3st*17
+ timeout: 3000
+ poolMinIdle: 8
+ poolConnTimeout: 3000
+ poolSize: 10
+ single:
+ address: 172.16.1.16:6392
+ cluster:
+ scanInterval: 1000
+ nodes: 172.16.1.16:6390,172.16.1.16:6391,172.16.1.16:6392
+ readMode: SLAVE
+ retryAttempts: 3
+ slaveConnectionPoolSize: 64
+ masterConnectionPoolSize: 64
+ retryInterval: 1500
+ sentinel:
+ master: mq_master
+ nodes: 172.16.1.16:26371,172.16.1.16:26372,172.16.1.16:26373
\ No newline at end of file
diff --git a/smqtt-bootstrap/src/main/resources/test.yaml b/smqtt-bootstrap/src/main/resources/test.yaml
deleted file mode 100644
index e4203080c7a10e0811258fd56cb1cf8b0f78465e..0000000000000000000000000000000000000000
--- a/smqtt-bootstrap/src/main/resources/test.yaml
+++ /dev/null
@@ -1,3 +0,0 @@
-smqtt:
- tcp:
- port: 7000
\ No newline at end of file
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java b/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java
index ad5a890440d307136f1e9d9571441c9df4f33736..bd7da1b0cf77f7b9c6426c927b41ca9b3459ae3f 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java
@@ -14,6 +14,9 @@ public class BootstrapKey {
/*redis前缀key*/
public static final String REDIS_SESSION_MESSAGE_PREFIX_KEY = "smqtt:session:message:";
+ /*redis前缀key 离线消息*/
+ public static final String REDIS_OFFLINE_MESSAGE_PREFIX_KEY = "smqtt:offline:message:";
+
/*模式*/
public static final String REDIS_MODE = "redis.mode";
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java b/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java
index cb6d17b619f128b6376236f54806ea4c049095b9..e34f7c8ab2697a988d3869a33468ae79f114b8da 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java
@@ -78,6 +78,7 @@ public class MessageProxy {
.topic(header.topicName())
.retain(fixedHeader.isRetain())
.qos(fixedHeader.qosLevel().value())
+ .properties(header.properties())
.build();
}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java
index 19c340dac9389abc45642e94393119623571ad09..2ab22342784775c9b8d3cc01102251df3380543e 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java
@@ -1,6 +1,7 @@
package io.github.quickmsg.common.message;
import io.github.quickmsg.common.utils.JacksonUtil;
+import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.util.internal.StringUtil;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -31,6 +32,8 @@ public class HeapMqttMessage {
private byte[] message;
+ private MqttProperties properties;
+
public Map getKeyMap() {
Map keys = new HashMap<>(5);
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java
index c21887dba432d7aa5498b958707ec25a005e632f..9bc4f8acf16be246410d4c927fd2f3f9bb960b79 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java
@@ -47,4 +47,18 @@ public interface MessageRegistry extends StartUp {
List getRetainMessage(String topic);
+ /**
+ * 持久化离线消息
+ *
+ * @param message
+ */
+ void saveOfflineMessage(RetainMessage message);
+
+ /**
+ * 获取离线消息
+ *
+ * @param topic
+ * @return
+ */
+ List getOfflineMessage(String topic);
}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java
index d6a16539cc6c0dfaa5f0e63141210ed658e9ea7f..6d807b43de3ffc8e3547823fef5a9e5c02c5a136 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java
@@ -4,6 +4,13 @@ import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.*;
import java.util.List;
+import java.util.Map;
+
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD;
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID;
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED_5;
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE_5;
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION;
/**
@@ -11,6 +18,32 @@ import java.util.List;
*/
public class MqttMessageBuilder {
+ private static MqttProperties genMqttProperties(Map userPropertiesMap) {
+ MqttProperties mqttProperties = null;
+ if (userPropertiesMap != null) {
+ mqttProperties = new MqttProperties();
+ MqttProperties.UserProperties userProperties = new MqttProperties.UserProperties();
+ for (Map.Entry entry : userPropertiesMap.entrySet()) {
+ userProperties.add(entry.getKey(), entry.getValue());
+ }
+ mqttProperties.add(userProperties);
+ }
+ return mqttProperties;
+ }
+
+ public static MqttPublishMessage buildPub(boolean isDup, MqttQoS qoS, int messageId, String topic, ByteBuf message, MqttProperties properties) {
+ MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qoS, false, 0);
+ MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(topic, messageId, properties);
+ MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, message);
+ return mqttPublishMessage;
+ }
+
+ public static MqttPublishMessage buildPub(boolean isDup, MqttQoS qoS, int messageId, String topic, ByteBuf message, Map userPropertiesMap) {
+ MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qoS, false, 0);
+ MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(topic, messageId, genMqttProperties(userPropertiesMap));
+ MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, message);
+ return mqttPublishMessage;
+ }
public static MqttPublishMessage buildPub(boolean isDup, MqttQoS qoS, int messageId, String topic, ByteBuf message) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qoS, false, 0);
@@ -69,7 +102,27 @@ public class MqttMessageBuilder {
return new MqttUnsubAckMessage(mqttFixedHeader, variableHeader);
}
- public static MqttConnAckMessage buildConnectAck(MqttConnectReturnCode connectReturnCode) {
+ public static MqttConnAckMessage buildConnectAck(MqttConnectReturnCode connectReturnCode, int version) {
+ if (MqttVersion.MQTT_5.protocolLevel() != (byte) version) {
+ switch (connectReturnCode) {
+ case CONNECTION_REFUSED_IDENTIFIER_REJECTED:
+ connectReturnCode = CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID;
+ break;
+ case CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION:
+ connectReturnCode = CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION;
+ break;
+ case CONNECTION_REFUSED_SERVER_UNAVAILABLE:
+ connectReturnCode = CONNECTION_REFUSED_SERVER_UNAVAILABLE_5;
+ break;
+ case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD:
+ connectReturnCode = CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD;
+ break;
+ case CONNECTION_REFUSED_NOT_AUTHORIZED:
+ connectReturnCode = CONNECTION_REFUSED_NOT_AUTHORIZED_5;
+ break;
+
+ }
+ }
MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(connectReturnCode, false);
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(
MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0X02);
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java
index fbc623529cdcbc7e0f9fb110feb2f8899ebc30e4..8f64c4cd9d5bd683a1240c1f32ffb6e537af4080 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java
@@ -1,8 +1,13 @@
package io.github.quickmsg.common.message;
+import java.util.HashMap;
+import java.util.Optional;
+
import io.github.quickmsg.common.channel.MqttChannel;
+import io.github.quickmsg.common.utils.JacksonUtil;
import io.github.quickmsg.common.utils.MessageUtils;
import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
@@ -22,12 +27,27 @@ public class RetainMessage {
private byte[] body;
+ private String userProperties;
+
public static RetainMessage of(MqttPublishMessage mqttPublishMessage) {
MqttPublishVariableHeader publishVariableHeader = mqttPublishMessage.variableHeader();
return RetainMessage.builder()
.topic(publishVariableHeader.topicName())
.qos(mqttPublishMessage.fixedHeader().qosLevel().value())
.body(MessageUtils.copyByteBuf(mqttPublishMessage.payload()))
+ .userProperties(JacksonUtil
+ .map2Json(Optional
+ .ofNullable(publishVariableHeader
+ .properties()
+ .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value()))
+ .map(list -> {
+ HashMap propertiesMap = new HashMap<>();
+ list.forEach(property -> {
+ MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value();
+ propertiesMap.put(pair.key, pair.value);
+ });
+ return propertiesMap;
+ }).get()))
.build();
}
@@ -37,7 +57,8 @@ public class RetainMessage {
MqttQoS.valueOf(this.qos),
qos > 0 ? mqttChannel.generateMessageId() : 0,
topic,
- PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body));
+ PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body),
+ JacksonUtil.json2Map(userProperties, String.class, String.class));
}
}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java
index a3596af3aa8e4686cf1786d39598480c42efc2d8..2337caebeb9a07fb147c64fa985859e43955ec30 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java
@@ -1,8 +1,13 @@
package io.github.quickmsg.common.message;
+import java.util.HashMap;
+import java.util.Optional;
+
import io.github.quickmsg.common.channel.MqttChannel;
+import io.github.quickmsg.common.utils.JacksonUtil;
import io.github.quickmsg.common.utils.MessageUtils;
import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
@@ -27,6 +32,8 @@ public class SessionMessage {
private boolean retain;
+ private String userProperties;
+
public static SessionMessage of(String clientIdentifier, MqttPublishMessage mqttPublishMessage) {
MqttPublishVariableHeader publishVariableHeader = mqttPublishMessage.variableHeader();
return SessionMessage.builder()
@@ -35,6 +42,19 @@ public class SessionMessage {
.qos(mqttPublishMessage.fixedHeader().qosLevel().value())
.retain(mqttPublishMessage.fixedHeader().isRetain())
.body(MessageUtils.copyByteBuf(mqttPublishMessage.payload()))
+ .userProperties(JacksonUtil
+ .map2Json(Optional
+ .ofNullable(publishVariableHeader
+ .properties()
+ .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value()))
+ .map(list -> {
+ HashMap propertiesMap = new HashMap<>();
+ list.forEach(property -> {
+ MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value();
+ propertiesMap.put(pair.key, pair.value);
+ });
+ return propertiesMap;
+ }).get()))
.build();
}
@@ -44,7 +64,8 @@ public class SessionMessage {
MqttQoS.valueOf(this.qos),
qos > 0 ? mqttChannel.generateMessageId() : 0,
topic,
- PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body));
+ PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body),
+ JacksonUtil.json2Map(userProperties, String.class, String.class));
}
}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java
index fa197573b91107d755751afe9c9cf6c6e2877aa1..f25c2789230a5855f43f1a7e62446a2aee7d191f 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java
@@ -67,7 +67,9 @@ public class MessageUtils {
MqttPublishVariableHeader mqttPublishVariableHeader = message.variableHeader();
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttFixedHeader newFixedHeader = new MqttFixedHeader(mqttFixedHeader.messageType(), false, mqttQoS, false, mqttFixedHeader.remainingLength());
- MqttPublishVariableHeader newHeader = new MqttPublishVariableHeader(mqttPublishVariableHeader.topicName(), messageId);
+ // mqtt 5 support properties
+ System.out.println( "----" + mqttPublishVariableHeader.properties().getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value()) );
+ MqttPublishVariableHeader newHeader = new MqttPublishVariableHeader(mqttPublishVariableHeader.topicName(), messageId, mqttPublishVariableHeader.properties());
return new MqttPublishMessage(newFixedHeader, newHeader, message.payload().copy());
}
diff --git a/smqtt-core/pom.xml b/smqtt-core/pom.xml
index dbb26bfb59fcfad27bf26045f1afee2353a59415..e4e557ca1cb46362d5d78237ccb5985965ff4114 100644
--- a/smqtt-core/pom.xml
+++ b/smqtt-core/pom.xml
@@ -31,6 +31,11 @@
smqtt-metric-prometheus
1.1.2
+
+ io.github.quickmsg
+ smqtt-persistent-redis
+ 1.1.2
+
\ No newline at end of file
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java b/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java
index d751f33ba3eb3b42e7f01106fdb31dc4cc4e4ec3..62d5f3a39dbf0be92f69cd5092cd3c3c2211d79f 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java
@@ -56,7 +56,7 @@ public class ClusterReceiver {
MqttQoS.valueOf(heapMqttMessage.getQos()),
0,
heapMqttMessage.getTopic(),
- PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage())), System.currentTimeMillis(), Boolean.TRUE);
+ PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage()), heapMqttMessage.getProperties()), System.currentTimeMillis(), Boolean.TRUE);
}
}
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java
index 6e6bd47a320e2d820938b5e5f7a4f306e5c15a18..e90042add3e51b4fc70d8ec7486c70ab5102a435 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java
@@ -68,7 +68,7 @@ public class ConnectProtocol implements Protocol {
if (mqttReceiveContext.getConfiguration().getConnectModel() == ConnectModel.UNIQUE) {
if (channelRegistry.exists(clientIdentifier)) {
return mqttChannel.write(
- MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED),
+ MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, mqttConnectVariableHeader.version()),
false).then(mqttChannel.close());
}
} else {
@@ -78,16 +78,16 @@ public class ConnectProtocol implements Protocol {
existMqttChannel.close().subscribe();
} else {
return mqttChannel.write(
- MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED),
+ MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, mqttConnectVariableHeader.version()),
false).then(mqttChannel.close());
}
}
}
/*protocol version support*/
if (MqttVersion.MQTT_3_1_1.protocolLevel() != (byte) mqttConnectVariableHeader.version()
- && MqttVersion.MQTT_3_1.protocolLevel() != (byte) mqttConnectVariableHeader.version()) {
+ && MqttVersion.MQTT_5.protocolLevel() != (byte) mqttConnectVariableHeader.version()) {
return mqttChannel.write(
- MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION),
+ MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, mqttConnectVariableHeader.version()),
false).then(mqttChannel.close());
}
/*password check*/
@@ -147,11 +147,11 @@ public class ConnectProtocol implements Protocol {
eventRegistry.registry(Event.CONNECT, mqttChannel, message, mqttReceiveContext);
- return mqttChannel.write(MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_ACCEPTED), false)
+ return mqttChannel.write(MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeader.version()), false)
.then(Mono.fromRunnable(() -> sendOfflineMessage(mqttReceiveContext.getMessageRegistry(), mqttChannel)));
} else {
return mqttChannel.write(
- MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD),
+ MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, mqttConnectVariableHeader.version()),
false).then(mqttChannel.close());
}
} catch (Exception e) {
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java
index 1a405e0799b2a5f9ce21db92daa533e2fa29afbe..864b8bb624cba3d5e7386da426d8863c54b43ed8 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java
@@ -30,6 +30,8 @@ public class PublishProtocol implements Protocol {
private static List MESSAGE_TYPE_LIST = new ArrayList<>();
+ private static String ONE2ONE_TOPIC_PREFIX = "ipush/mt/msg/";
+
static {
MESSAGE_TYPE_LIST.add(MqttMessageType.PUBLISH);
}
@@ -52,6 +54,10 @@ public class PublishProtocol implements Protocol {
MessageRegistry messageRegistry = receiveContext.getMessageRegistry();
Set mqttChannels = topicRegistry.getSubscribesByTopic(variableHeader.topicName(),
message.fixedHeader().qosLevel());
+ // 定制化处理,一对一发送的离线消息进行持久化,保证送达
+ if (mqttChannels.isEmpty() && variableHeader.topicName().startsWith(ONE2ONE_TOPIC_PREFIX)) {
+ messageRegistry.saveOfflineMessage(RetainMessage.of(message));
+ }
// http mock
if (mqttChannel.getIsMock()) {
return send(mqttChannels, message, messageRegistry, filterRetainMessage(message, messageRegistry));
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java
index 6a1c2f4d6409c53364b74d21a25338d99eb66d72..4cb98a0bf7a51b871358ebd04c93ecee95b53dbe 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java
@@ -11,13 +11,13 @@ import io.github.quickmsg.common.protocol.Protocol;
import io.github.quickmsg.common.topic.SubscribeTopic;
import io.github.quickmsg.common.topic.TopicRegistry;
import io.netty.handler.codec.mqtt.MqttMessageType;
-import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -40,7 +40,10 @@ public class SubscribeProtocol implements Protocol {
Set mqttTopicSubscriptions =
message.payload().topicSubscriptions()
.stream()
- .peek(mqttTopicSubscription -> this.loadRetainMessage(messageRegistry, mqttChannel, mqttTopicSubscription.topicName()))
+ .peek(mqttTopicSubscription -> {
+ this.loadRetainMessage(messageRegistry, mqttChannel, mqttTopicSubscription.topicName());
+ this.loadOfflineMessage(messageRegistry, mqttChannel, mqttTopicSubscription.topicName());
+ })
.map(mqttTopicSubscription ->
new SubscribeTopic(mqttTopicSubscription.topicName(), mqttTopicSubscription.qualityOfService(), mqttChannel))
.collect(Collectors.toSet());
@@ -57,6 +60,12 @@ public class SubscribeProtocol implements Protocol {
.collect(Collectors.toList())), false));
}
+ /**
+ * 获取保留消息
+ * @param messageRegistry
+ * @param mqttChannel
+ * @param topicName
+ */
private void loadRetainMessage(MessageRegistry messageRegistry, MqttChannel mqttChannel, String topicName) {
messageRegistry.getRetainMessage(topicName)
.forEach(retainMessage ->
@@ -64,6 +73,19 @@ public class SubscribeProtocol implements Protocol {
.subscribe());
}
+ /**
+ * 获取离线消息
+ * @param messageRegistry
+ * @param mqttChannel
+ * @param topicName
+ */
+ private void loadOfflineMessage(MessageRegistry messageRegistry, MqttChannel mqttChannel, String topicName) {
+ Optional.ofNullable(messageRegistry.getOfflineMessage(topicName)).ifPresent(msgList ->
+ msgList.forEach(retainMessage ->
+ mqttChannel.write(retainMessage.toPublishMessage(mqttChannel), retainMessage.getQos() > 0)
+ .subscribe()));
+ }
+
@Override
public List getMqttMessageTypes() {
return MESSAGE_TYPE_LIST;
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java
index 256d8170f42227019a8c1ec680095a9ecc04b99f..b3f2919bdae5b89530102250afe2ecc4e84ae88e 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java
@@ -5,6 +5,7 @@ import io.github.quickmsg.common.message.RetainMessage;
import io.github.quickmsg.common.message.SessionMessage;
import io.github.quickmsg.common.utils.TopicRegexUtils;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -47,4 +48,15 @@ public class DefaultMessageRegistry implements MessageRegistry {
.collect(Collectors.toList());
}
+ @Override
+ public List getOfflineMessage(String topic) {
+ // TODO impl
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public void saveOfflineMessage(RetainMessage offlineMessage) {
+ // TODO impl
+ }
+
}
diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java
index 92992c29dee58d8e71718b6cf1bbaf86452e9a35..bde255509e152554dad6aa185955d78f79978061 100644
--- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java
+++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java
@@ -198,4 +198,15 @@ public class DbMessageRegistry implements MessageRegistry {
return StringUtils.isBlank(body) ? null : body.getBytes(CharsetUtil.UTF_8);
}
+ @Override
+ public List getOfflineMessage(String topic) {
+ // TODO impl
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public void saveOfflineMessage(RetainMessage offlineMessage) {
+ // TODO impl
+ }
+
}
diff --git a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java
index f30dc0208e235749b77df0392bfca0e6fdbb4c5b..086c987049d3960ec93cd0f42d93a0d164b7b89d 100644
--- a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java
+++ b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java
@@ -24,6 +24,8 @@ public class RetainMessageEntity implements Serializable {
private byte[] body;
+ private String userProperties;
+
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date createTime;
diff --git a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java
index 2966a12546e27774fa61ae8c630f5a11c8bbcd81..418d356b3e9f8539f0d90a4a41ca1b424898e5c5 100644
--- a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java
+++ b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java
@@ -25,6 +25,8 @@ public class SessionMessageEntity implements Serializable {
private Boolean retain;
+ private String userProperties;
+
private byte[] body;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date createTime;
diff --git a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java
index a24f5a978ac3192ed52b45a166feac363b035337..a31e2df9eb06d505f872b80c95d9d7c4a3cc88a0 100644
--- a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java
+++ b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java
@@ -2,7 +2,6 @@ package io.github.quickmsg.persistent.registry;
import io.github.quickmsg.common.bootstrap.BootstrapKey;
import io.github.quickmsg.common.config.BootstrapConfig;
-import io.github.quickmsg.common.environment.EnvContext;
import io.github.quickmsg.common.message.MessageRegistry;
import io.github.quickmsg.common.message.RetainMessage;
import io.github.quickmsg.common.message.SessionMessage;
@@ -15,10 +14,11 @@ import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBucket;
import org.redisson.api.RKeys;
import org.redisson.api.RList;
+import org.redisson.api.RSetCache;
import org.redisson.api.RedissonClient;
-import java.sql.Connection;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -59,6 +59,7 @@ public class RedisMessageRegistry implements MessageRegistry {
.qos(record.getQos())
.retain(record.getRetain())
.body(record.getBody())
+ .userProperties(record.getUserProperties())
.build()
).collect(Collectors.toList());
@@ -79,6 +80,7 @@ public class RedisMessageRegistry implements MessageRegistry {
int qos = sessionMessage.getQos();
boolean retain = sessionMessage.isRetain();
byte[] body = sessionMessage.getBody();
+ String userProperty = sessionMessage.getUserProperties();
try {
SessionMessageEntity sessionMessageEntity = SessionMessageEntity.builder()
@@ -87,6 +89,7 @@ public class RedisMessageRegistry implements MessageRegistry {
.qos(qos)
.body(body)
.retain(retain)
+ .userProperties(userProperty)
.createTime(new Date()).build();
RList list = redissonClient.getList(BootstrapKey.Redis.REDIS_SESSION_MESSAGE_PREFIX_KEY + clientIdentifier);
@@ -110,6 +113,7 @@ public class RedisMessageRegistry implements MessageRegistry {
.topic(topic)
.qos(qos)
.body(retainMessage.getBody())
+ .userProperties(retainMessage.getUserProperties())
.createTime(date)
.updateTime(date).build();
@@ -137,6 +141,7 @@ public class RedisMessageRegistry implements MessageRegistry {
.topic(item.getTopic())
.qos(item.getQos())
.body(item.getBody())
+ .userProperties(item.getUserProperties())
.build()
).orElse(null);
})
@@ -148,4 +153,49 @@ public class RedisMessageRegistry implements MessageRegistry {
return Collections.emptyList();
}
+ @Override
+ public List getOfflineMessage(String topic) {
+ // 一对一发送,只需要精确匹配
+ try {
+ RSetCache set = redissonClient.getSetCache(BootstrapKey.Redis.REDIS_OFFLINE_MESSAGE_PREFIX_KEY + topic);
+ List resList = set.stream().map(record -> RetainMessage.builder()
+ .topic(record.getTopic())
+ .qos(record.getQos())
+ .body(record.getBody())
+ .userProperties(record.getUserProperties())
+ .build()
+ ).collect(Collectors.toList());
+
+ if (set.size() > 0) {
+ redissonClient.getBucket(BootstrapKey.Redis.REDIS_OFFLINE_MESSAGE_PREFIX_KEY + topic).delete();
+ }
+ return resList;
+ } catch (Exception e) {
+ log.error("getOfflineMessage error clientIdentifier:{}", topic, e);
+ return Collections.emptyList();
+ }
+ }
+
+ @Override
+ public void saveOfflineMessage(RetainMessage offlineMessage) {
+ String topic = offlineMessage.getTopic();
+ int qos = offlineMessage.getQos();
+ byte[] body = offlineMessage.getBody();
+ String userProperties = offlineMessage.getUserProperties();
+ try {
+ RetainMessageEntity offlineMessageEntity = RetainMessageEntity.builder()
+ .topic(topic)
+ .qos(qos)
+ .body(body)
+ .userProperties(userProperties)
+ .createTime(new Date())
+ .build();
+
+ RSetCache set = redissonClient.getSetCache(BootstrapKey.Redis.REDIS_OFFLINE_MESSAGE_PREFIX_KEY + topic);
+ set.add(offlineMessageEntity, 1, TimeUnit.DAYS);
+ } catch (Exception e) {
+ log.error("saveOfflineMessage error message: {}", topic, e);
+ }
+ }
+
}
diff --git a/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java b/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java
index 2b28c1e7300118dbe560a73bdef6a1f23472eddc..f320d965bc38d88abde60ed0c62d20f9386c815e 100644
--- a/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java
+++ b/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java
@@ -47,7 +47,7 @@ public class TopicRuleNode implements RuleNode {
log.info("rule engine TopicRuleNode request {}", heapMqttMessage);
ProtocolAdaptor protocolAdaptor = receiveContext.getProtocolAdaptor();
protocolAdaptor.chooseProtocol(MockMqttChannel.wrapClientIdentifier(heapMqttMessage.getClientIdentifier()),
- new SmqttMessage<>(getMqttMessage(heapMqttMessage),heapMqttMessage.getTimestamp(),Boolean.TRUE), receiveContext);
+ new SmqttMessage<>(getMqttMessage(heapMqttMessage), heapMqttMessage.getTimestamp(), Boolean.TRUE), receiveContext);
executeNext(contextView);
}
@@ -58,7 +58,8 @@ public class TopicRuleNode implements RuleNode {
MqttQoS.valueOf(heapMqttMessage.getQos()),
0,
this.topic,
- PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage()));
+ PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage()),
+ heapMqttMessage.getProperties());
}