diff --git a/smqtt-bootstrap/pom.xml b/smqtt-bootstrap/pom.xml index 1f67f6d19a35971acb604dc67d441429e65b3469..dddb998cd53713e5caade0066e83ac9340ecdc64 100644 --- a/smqtt-bootstrap/pom.xml +++ b/smqtt-bootstrap/pom.xml @@ -57,6 +57,21 @@ io.github.quickmsg 1.1.2 + + org.springframework.boot + spring-boot + 2.2.1.RELEASE + + + org.springframework.boot + spring-boot-autoconfigure + 2.2.1.RELEASE + + + io.github.quickmsg + smqtt-spring-boot-starter + 1.1.2 + diff --git a/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java b/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java new file mode 100644 index 0000000000000000000000000000000000000000..1a107e5f65d6c590f712a6bf779c08635094a460 --- /dev/null +++ b/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java @@ -0,0 +1,22 @@ +package io.github.quickmsg.springboot; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.ComponentScan; + +import io.github.quickmsg.starter.EnableMqttServer; + +/** + * @Author: Jingwu.Zhou + * @Date: 2022/1/24 + */ +@SpringBootApplication +@EnableConfigurationProperties +@ComponentScan(basePackages = {"io.github.quickmsg"}) +@EnableMqttServer +public class SpringBootStarter { + public static void main(String[] args) { + SpringApplication.run(SpringBootStarter.class, args); + } +} diff --git a/smqtt-bootstrap/src/main/resources/application.yml b/smqtt-bootstrap/src/main/resources/application.yml new file mode 100644 index 0000000000000000000000000000000000000000..acc60e3252ac8fc147edb7d0fb5783dbc71b2244 --- /dev/null +++ b/smqtt-bootstrap/src/main/resources/application.yml @@ -0,0 +1,61 @@ + smqtt: + logLevel: INFO # 系统日志 + tcp: # tcp配置 + port: 1883 # mqtt端口号 + username: APPID # mqtt连接默认用户名 生产环境建议spi去注入PasswordAuthentication接口 + password: 123456789 # mqtt连接默认密码 生产环境建议spi去注入PasswordAuthentication接口 + wiretap: true # 二进制日志 前提是 smqtt.logLevel = DEBUG + bossThreadSize: 4 # boss线程 + workThreadSize: 8 # work线程 + lowWaterMark: 4000000 # 不建议配置 默认 32768 + highWaterMark: 80000000 # 不建议配置 默认 65536 + businessThreadSize: 16 # 业务线程数 默认=cpu核心数*10 + businessQueueSize: 100000 #业务队列 默认=100000 + # globalReadWriteSize: 10000000,100000000 全局读写大小限制 + # channelReadWriteSize: 10000000,100000000 单个channel读写大小限制 + ssl: # ssl配置 + enable: false # 开关 + key: /user/server.key # 指定ssl文件 默认系统生成 + crt: /user/server.crt # 指定ssl文件 默认系统生成 + http: # http相关配置 端口固定60000 + enable: false # 开关 + accessLog: true # http访问日志 + ssl: # ssl配置 + enable: false + admin: # 后台管理配置 + enable: true # 开关 + username: smqtt # 访问用户名 + password: smqtt # 访问密码 + ws: # websocket配置 + enable: false # 开关 + port: 8999 # 端口 + path: /mqtt # ws 的访问path mqtt.js请设置此选项 + cluster: # 集群配置 + enable: false # 集群开关 + url: 127.0.0.1:7771,127.0.0.1:7772 # 启动节点 + port: 7771 # 端口 + node: node-1 # 集群节点名称 唯一 + external: + host: localhost # 用于映射容器ip 请不要随意设置,如果不需要请移除此选项 + port: 7777 # 用于映射容器端口 请不要随意设置,如果不需要请移除此选项 + redis: # redis 请参考 https://doc.smqtt.cc/%E5%85%B6%E4%BB%96/1.store.html 【如果没有引入相关依赖请移除此配置】 + mode: sentinel + database: 7 + password: xuanwu-T3st*17 + timeout: 3000 + poolMinIdle: 8 + poolConnTimeout: 3000 + poolSize: 10 + single: + address: 172.16.1.16:6392 + cluster: + scanInterval: 1000 + nodes: 172.16.1.16:6390,172.16.1.16:6391,172.16.1.16:6392 + readMode: SLAVE + retryAttempts: 3 + slaveConnectionPoolSize: 64 + masterConnectionPoolSize: 64 + retryInterval: 1500 + sentinel: + master: mq_master + nodes: 172.16.1.16:26371,172.16.1.16:26372,172.16.1.16:26373 \ No newline at end of file diff --git a/smqtt-bootstrap/src/main/resources/test.yaml b/smqtt-bootstrap/src/main/resources/test.yaml deleted file mode 100644 index e4203080c7a10e0811258fd56cb1cf8b0f78465e..0000000000000000000000000000000000000000 --- a/smqtt-bootstrap/src/main/resources/test.yaml +++ /dev/null @@ -1,3 +0,0 @@ -smqtt: - tcp: - port: 7000 \ No newline at end of file diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java b/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java index ad5a890440d307136f1e9d9571441c9df4f33736..bd7da1b0cf77f7b9c6426c927b41ca9b3459ae3f 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java @@ -14,6 +14,9 @@ public class BootstrapKey { /*redis前缀key*/ public static final String REDIS_SESSION_MESSAGE_PREFIX_KEY = "smqtt:session:message:"; + /*redis前缀key 离线消息*/ + public static final String REDIS_OFFLINE_MESSAGE_PREFIX_KEY = "smqtt:offline:message:"; + /*模式*/ public static final String REDIS_MODE = "redis.mode"; diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java b/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java index cb6d17b619f128b6376236f54806ea4c049095b9..e34f7c8ab2697a988d3869a33468ae79f114b8da 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java @@ -78,6 +78,7 @@ public class MessageProxy { .topic(header.topicName()) .retain(fixedHeader.isRetain()) .qos(fixedHeader.qosLevel().value()) + .properties(header.properties()) .build(); } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java index 19c340dac9389abc45642e94393119623571ad09..2ab22342784775c9b8d3cc01102251df3380543e 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java @@ -1,6 +1,7 @@ package io.github.quickmsg.common.message; import io.github.quickmsg.common.utils.JacksonUtil; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.util.internal.StringUtil; import lombok.AllArgsConstructor; import lombok.Builder; @@ -31,6 +32,8 @@ public class HeapMqttMessage { private byte[] message; + private MqttProperties properties; + public Map getKeyMap() { Map keys = new HashMap<>(5); diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java index c21887dba432d7aa5498b958707ec25a005e632f..9bc4f8acf16be246410d4c927fd2f3f9bb960b79 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java @@ -47,4 +47,18 @@ public interface MessageRegistry extends StartUp { List getRetainMessage(String topic); + /** + * 持久化离线消息 + * + * @param message + */ + void saveOfflineMessage(RetainMessage message); + + /** + * 获取离线消息 + * + * @param topic + * @return + */ + List getOfflineMessage(String topic); } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java index d6a16539cc6c0dfaa5f0e63141210ed658e9ea7f..6d807b43de3ffc8e3547823fef5a9e5c02c5a136 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java @@ -4,6 +4,13 @@ import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.*; import java.util.List; +import java.util.Map; + +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED_5; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE_5; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION; /** @@ -11,6 +18,32 @@ import java.util.List; */ public class MqttMessageBuilder { + private static MqttProperties genMqttProperties(Map userPropertiesMap) { + MqttProperties mqttProperties = null; + if (userPropertiesMap != null) { + mqttProperties = new MqttProperties(); + MqttProperties.UserProperties userProperties = new MqttProperties.UserProperties(); + for (Map.Entry entry : userPropertiesMap.entrySet()) { + userProperties.add(entry.getKey(), entry.getValue()); + } + mqttProperties.add(userProperties); + } + return mqttProperties; + } + + public static MqttPublishMessage buildPub(boolean isDup, MqttQoS qoS, int messageId, String topic, ByteBuf message, MqttProperties properties) { + MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qoS, false, 0); + MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(topic, messageId, properties); + MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, message); + return mqttPublishMessage; + } + + public static MqttPublishMessage buildPub(boolean isDup, MqttQoS qoS, int messageId, String topic, ByteBuf message, Map userPropertiesMap) { + MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qoS, false, 0); + MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(topic, messageId, genMqttProperties(userPropertiesMap)); + MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, message); + return mqttPublishMessage; + } public static MqttPublishMessage buildPub(boolean isDup, MqttQoS qoS, int messageId, String topic, ByteBuf message) { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qoS, false, 0); @@ -69,7 +102,27 @@ public class MqttMessageBuilder { return new MqttUnsubAckMessage(mqttFixedHeader, variableHeader); } - public static MqttConnAckMessage buildConnectAck(MqttConnectReturnCode connectReturnCode) { + public static MqttConnAckMessage buildConnectAck(MqttConnectReturnCode connectReturnCode, int version) { + if (MqttVersion.MQTT_5.protocolLevel() != (byte) version) { + switch (connectReturnCode) { + case CONNECTION_REFUSED_IDENTIFIER_REJECTED: + connectReturnCode = CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID; + break; + case CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION: + connectReturnCode = CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION; + break; + case CONNECTION_REFUSED_SERVER_UNAVAILABLE: + connectReturnCode = CONNECTION_REFUSED_SERVER_UNAVAILABLE_5; + break; + case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD: + connectReturnCode = CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD; + break; + case CONNECTION_REFUSED_NOT_AUTHORIZED: + connectReturnCode = CONNECTION_REFUSED_NOT_AUTHORIZED_5; + break; + + } + } MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(connectReturnCode, false); MqttFixedHeader mqttFixedHeader = new MqttFixedHeader( MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0X02); diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java index fbc623529cdcbc7e0f9fb110feb2f8899ebc30e4..8f64c4cd9d5bd683a1240c1f32ffb6e537af4080 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java @@ -1,8 +1,13 @@ package io.github.quickmsg.common.message; +import java.util.HashMap; +import java.util.Optional; + import io.github.quickmsg.common.channel.MqttChannel; +import io.github.quickmsg.common.utils.JacksonUtil; import io.github.quickmsg.common.utils.MessageUtils; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; @@ -22,12 +27,27 @@ public class RetainMessage { private byte[] body; + private String userProperties; + public static RetainMessage of(MqttPublishMessage mqttPublishMessage) { MqttPublishVariableHeader publishVariableHeader = mqttPublishMessage.variableHeader(); return RetainMessage.builder() .topic(publishVariableHeader.topicName()) .qos(mqttPublishMessage.fixedHeader().qosLevel().value()) .body(MessageUtils.copyByteBuf(mqttPublishMessage.payload())) + .userProperties(JacksonUtil + .map2Json(Optional + .ofNullable(publishVariableHeader + .properties() + .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value())) + .map(list -> { + HashMap propertiesMap = new HashMap<>(); + list.forEach(property -> { + MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value(); + propertiesMap.put(pair.key, pair.value); + }); + return propertiesMap; + }).get())) .build(); } @@ -37,7 +57,8 @@ public class RetainMessage { MqttQoS.valueOf(this.qos), qos > 0 ? mqttChannel.generateMessageId() : 0, topic, - PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body)); + PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body), + JacksonUtil.json2Map(userProperties, String.class, String.class)); } } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java index a3596af3aa8e4686cf1786d39598480c42efc2d8..2337caebeb9a07fb147c64fa985859e43955ec30 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java @@ -1,8 +1,13 @@ package io.github.quickmsg.common.message; +import java.util.HashMap; +import java.util.Optional; + import io.github.quickmsg.common.channel.MqttChannel; +import io.github.quickmsg.common.utils.JacksonUtil; import io.github.quickmsg.common.utils.MessageUtils; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; @@ -27,6 +32,8 @@ public class SessionMessage { private boolean retain; + private String userProperties; + public static SessionMessage of(String clientIdentifier, MqttPublishMessage mqttPublishMessage) { MqttPublishVariableHeader publishVariableHeader = mqttPublishMessage.variableHeader(); return SessionMessage.builder() @@ -35,6 +42,19 @@ public class SessionMessage { .qos(mqttPublishMessage.fixedHeader().qosLevel().value()) .retain(mqttPublishMessage.fixedHeader().isRetain()) .body(MessageUtils.copyByteBuf(mqttPublishMessage.payload())) + .userProperties(JacksonUtil + .map2Json(Optional + .ofNullable(publishVariableHeader + .properties() + .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value())) + .map(list -> { + HashMap propertiesMap = new HashMap<>(); + list.forEach(property -> { + MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value(); + propertiesMap.put(pair.key, pair.value); + }); + return propertiesMap; + }).get())) .build(); } @@ -44,7 +64,8 @@ public class SessionMessage { MqttQoS.valueOf(this.qos), qos > 0 ? mqttChannel.generateMessageId() : 0, topic, - PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body)); + PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body), + JacksonUtil.json2Map(userProperties, String.class, String.class)); } } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java index fa197573b91107d755751afe9c9cf6c6e2877aa1..f25c2789230a5855f43f1a7e62446a2aee7d191f 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java @@ -67,7 +67,9 @@ public class MessageUtils { MqttPublishVariableHeader mqttPublishVariableHeader = message.variableHeader(); MqttFixedHeader mqttFixedHeader = message.fixedHeader(); MqttFixedHeader newFixedHeader = new MqttFixedHeader(mqttFixedHeader.messageType(), false, mqttQoS, false, mqttFixedHeader.remainingLength()); - MqttPublishVariableHeader newHeader = new MqttPublishVariableHeader(mqttPublishVariableHeader.topicName(), messageId); + // mqtt 5 support properties + System.out.println( "----" + mqttPublishVariableHeader.properties().getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value()) ); + MqttPublishVariableHeader newHeader = new MqttPublishVariableHeader(mqttPublishVariableHeader.topicName(), messageId, mqttPublishVariableHeader.properties()); return new MqttPublishMessage(newFixedHeader, newHeader, message.payload().copy()); } diff --git a/smqtt-core/pom.xml b/smqtt-core/pom.xml index dbb26bfb59fcfad27bf26045f1afee2353a59415..e4e557ca1cb46362d5d78237ccb5985965ff4114 100644 --- a/smqtt-core/pom.xml +++ b/smqtt-core/pom.xml @@ -31,6 +31,11 @@ smqtt-metric-prometheus 1.1.2 + + io.github.quickmsg + smqtt-persistent-redis + 1.1.2 + \ No newline at end of file diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java b/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java index d751f33ba3eb3b42e7f01106fdb31dc4cc4e4ec3..62d5f3a39dbf0be92f69cd5092cd3c3c2211d79f 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java @@ -56,7 +56,7 @@ public class ClusterReceiver { MqttQoS.valueOf(heapMqttMessage.getQos()), 0, heapMqttMessage.getTopic(), - PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage())), System.currentTimeMillis(), Boolean.TRUE); + PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage()), heapMqttMessage.getProperties()), System.currentTimeMillis(), Boolean.TRUE); } } diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java index 6e6bd47a320e2d820938b5e5f7a4f306e5c15a18..e90042add3e51b4fc70d8ec7486c70ab5102a435 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java @@ -68,7 +68,7 @@ public class ConnectProtocol implements Protocol { if (mqttReceiveContext.getConfiguration().getConnectModel() == ConnectModel.UNIQUE) { if (channelRegistry.exists(clientIdentifier)) { return mqttChannel.write( - MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED), + MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, mqttConnectVariableHeader.version()), false).then(mqttChannel.close()); } } else { @@ -78,16 +78,16 @@ public class ConnectProtocol implements Protocol { existMqttChannel.close().subscribe(); } else { return mqttChannel.write( - MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED), + MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, mqttConnectVariableHeader.version()), false).then(mqttChannel.close()); } } } /*protocol version support*/ if (MqttVersion.MQTT_3_1_1.protocolLevel() != (byte) mqttConnectVariableHeader.version() - && MqttVersion.MQTT_3_1.protocolLevel() != (byte) mqttConnectVariableHeader.version()) { + && MqttVersion.MQTT_5.protocolLevel() != (byte) mqttConnectVariableHeader.version()) { return mqttChannel.write( - MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION), + MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, mqttConnectVariableHeader.version()), false).then(mqttChannel.close()); } /*password check*/ @@ -147,11 +147,11 @@ public class ConnectProtocol implements Protocol { eventRegistry.registry(Event.CONNECT, mqttChannel, message, mqttReceiveContext); - return mqttChannel.write(MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_ACCEPTED), false) + return mqttChannel.write(MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeader.version()), false) .then(Mono.fromRunnable(() -> sendOfflineMessage(mqttReceiveContext.getMessageRegistry(), mqttChannel))); } else { return mqttChannel.write( - MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD), + MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, mqttConnectVariableHeader.version()), false).then(mqttChannel.close()); } } catch (Exception e) { diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java index 1a405e0799b2a5f9ce21db92daa533e2fa29afbe..864b8bb624cba3d5e7386da426d8863c54b43ed8 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java @@ -30,6 +30,8 @@ public class PublishProtocol implements Protocol { private static List MESSAGE_TYPE_LIST = new ArrayList<>(); + private static String ONE2ONE_TOPIC_PREFIX = "ipush/mt/msg/"; + static { MESSAGE_TYPE_LIST.add(MqttMessageType.PUBLISH); } @@ -52,6 +54,10 @@ public class PublishProtocol implements Protocol { MessageRegistry messageRegistry = receiveContext.getMessageRegistry(); Set mqttChannels = topicRegistry.getSubscribesByTopic(variableHeader.topicName(), message.fixedHeader().qosLevel()); + // 定制化处理,一对一发送的离线消息进行持久化,保证送达 + if (mqttChannels.isEmpty() && variableHeader.topicName().startsWith(ONE2ONE_TOPIC_PREFIX)) { + messageRegistry.saveOfflineMessage(RetainMessage.of(message)); + } // http mock if (mqttChannel.getIsMock()) { return send(mqttChannels, message, messageRegistry, filterRetainMessage(message, messageRegistry)); diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java index 6a1c2f4d6409c53364b74d21a25338d99eb66d72..4cb98a0bf7a51b871358ebd04c93ecee95b53dbe 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java @@ -11,13 +11,13 @@ import io.github.quickmsg.common.protocol.Protocol; import io.github.quickmsg.common.topic.SubscribeTopic; import io.github.quickmsg.common.topic.TopicRegistry; import io.netty.handler.codec.mqtt.MqttMessageType; -import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import reactor.core.publisher.Mono; import reactor.util.context.ContextView; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -40,7 +40,10 @@ public class SubscribeProtocol implements Protocol { Set mqttTopicSubscriptions = message.payload().topicSubscriptions() .stream() - .peek(mqttTopicSubscription -> this.loadRetainMessage(messageRegistry, mqttChannel, mqttTopicSubscription.topicName())) + .peek(mqttTopicSubscription -> { + this.loadRetainMessage(messageRegistry, mqttChannel, mqttTopicSubscription.topicName()); + this.loadOfflineMessage(messageRegistry, mqttChannel, mqttTopicSubscription.topicName()); + }) .map(mqttTopicSubscription -> new SubscribeTopic(mqttTopicSubscription.topicName(), mqttTopicSubscription.qualityOfService(), mqttChannel)) .collect(Collectors.toSet()); @@ -57,6 +60,12 @@ public class SubscribeProtocol implements Protocol { .collect(Collectors.toList())), false)); } + /** + * 获取保留消息 + * @param messageRegistry + * @param mqttChannel + * @param topicName + */ private void loadRetainMessage(MessageRegistry messageRegistry, MqttChannel mqttChannel, String topicName) { messageRegistry.getRetainMessage(topicName) .forEach(retainMessage -> @@ -64,6 +73,19 @@ public class SubscribeProtocol implements Protocol { .subscribe()); } + /** + * 获取离线消息 + * @param messageRegistry + * @param mqttChannel + * @param topicName + */ + private void loadOfflineMessage(MessageRegistry messageRegistry, MqttChannel mqttChannel, String topicName) { + Optional.ofNullable(messageRegistry.getOfflineMessage(topicName)).ifPresent(msgList -> + msgList.forEach(retainMessage -> + mqttChannel.write(retainMessage.toPublishMessage(mqttChannel), retainMessage.getQos() > 0) + .subscribe())); + } + @Override public List getMqttMessageTypes() { return MESSAGE_TYPE_LIST; diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java index 256d8170f42227019a8c1ec680095a9ecc04b99f..b3f2919bdae5b89530102250afe2ecc4e84ae88e 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java @@ -5,6 +5,7 @@ import io.github.quickmsg.common.message.RetainMessage; import io.github.quickmsg.common.message.SessionMessage; import io.github.quickmsg.common.utils.TopicRegexUtils; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -47,4 +48,15 @@ public class DefaultMessageRegistry implements MessageRegistry { .collect(Collectors.toList()); } + @Override + public List getOfflineMessage(String topic) { + // TODO impl + return Collections.EMPTY_LIST; + } + + @Override + public void saveOfflineMessage(RetainMessage offlineMessage) { + // TODO impl + } + } diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java index 92992c29dee58d8e71718b6cf1bbaf86452e9a35..bde255509e152554dad6aa185955d78f79978061 100644 --- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java @@ -198,4 +198,15 @@ public class DbMessageRegistry implements MessageRegistry { return StringUtils.isBlank(body) ? null : body.getBytes(CharsetUtil.UTF_8); } + @Override + public List getOfflineMessage(String topic) { + // TODO impl + return Collections.EMPTY_LIST; + } + + @Override + public void saveOfflineMessage(RetainMessage offlineMessage) { + // TODO impl + } + } diff --git a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java index f30dc0208e235749b77df0392bfca0e6fdbb4c5b..086c987049d3960ec93cd0f42d93a0d164b7b89d 100644 --- a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java +++ b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java @@ -24,6 +24,8 @@ public class RetainMessageEntity implements Serializable { private byte[] body; + private String userProperties; + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date createTime; diff --git a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java index 2966a12546e27774fa61ae8c630f5a11c8bbcd81..418d356b3e9f8539f0d90a4a41ca1b424898e5c5 100644 --- a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java +++ b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java @@ -25,6 +25,8 @@ public class SessionMessageEntity implements Serializable { private Boolean retain; + private String userProperties; + private byte[] body; @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date createTime; diff --git a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java index a24f5a978ac3192ed52b45a166feac363b035337..a31e2df9eb06d505f872b80c95d9d7c4a3cc88a0 100644 --- a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java +++ b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java @@ -2,7 +2,6 @@ package io.github.quickmsg.persistent.registry; import io.github.quickmsg.common.bootstrap.BootstrapKey; import io.github.quickmsg.common.config.BootstrapConfig; -import io.github.quickmsg.common.environment.EnvContext; import io.github.quickmsg.common.message.MessageRegistry; import io.github.quickmsg.common.message.RetainMessage; import io.github.quickmsg.common.message.SessionMessage; @@ -15,10 +14,11 @@ import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBucket; import org.redisson.api.RKeys; import org.redisson.api.RList; +import org.redisson.api.RSetCache; import org.redisson.api.RedissonClient; -import java.sql.Connection; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -59,6 +59,7 @@ public class RedisMessageRegistry implements MessageRegistry { .qos(record.getQos()) .retain(record.getRetain()) .body(record.getBody()) + .userProperties(record.getUserProperties()) .build() ).collect(Collectors.toList()); @@ -79,6 +80,7 @@ public class RedisMessageRegistry implements MessageRegistry { int qos = sessionMessage.getQos(); boolean retain = sessionMessage.isRetain(); byte[] body = sessionMessage.getBody(); + String userProperty = sessionMessage.getUserProperties(); try { SessionMessageEntity sessionMessageEntity = SessionMessageEntity.builder() @@ -87,6 +89,7 @@ public class RedisMessageRegistry implements MessageRegistry { .qos(qos) .body(body) .retain(retain) + .userProperties(userProperty) .createTime(new Date()).build(); RList list = redissonClient.getList(BootstrapKey.Redis.REDIS_SESSION_MESSAGE_PREFIX_KEY + clientIdentifier); @@ -110,6 +113,7 @@ public class RedisMessageRegistry implements MessageRegistry { .topic(topic) .qos(qos) .body(retainMessage.getBody()) + .userProperties(retainMessage.getUserProperties()) .createTime(date) .updateTime(date).build(); @@ -137,6 +141,7 @@ public class RedisMessageRegistry implements MessageRegistry { .topic(item.getTopic()) .qos(item.getQos()) .body(item.getBody()) + .userProperties(item.getUserProperties()) .build() ).orElse(null); }) @@ -148,4 +153,49 @@ public class RedisMessageRegistry implements MessageRegistry { return Collections.emptyList(); } + @Override + public List getOfflineMessage(String topic) { + // 一对一发送,只需要精确匹配 + try { + RSetCache set = redissonClient.getSetCache(BootstrapKey.Redis.REDIS_OFFLINE_MESSAGE_PREFIX_KEY + topic); + List resList = set.stream().map(record -> RetainMessage.builder() + .topic(record.getTopic()) + .qos(record.getQos()) + .body(record.getBody()) + .userProperties(record.getUserProperties()) + .build() + ).collect(Collectors.toList()); + + if (set.size() > 0) { + redissonClient.getBucket(BootstrapKey.Redis.REDIS_OFFLINE_MESSAGE_PREFIX_KEY + topic).delete(); + } + return resList; + } catch (Exception e) { + log.error("getOfflineMessage error clientIdentifier:{}", topic, e); + return Collections.emptyList(); + } + } + + @Override + public void saveOfflineMessage(RetainMessage offlineMessage) { + String topic = offlineMessage.getTopic(); + int qos = offlineMessage.getQos(); + byte[] body = offlineMessage.getBody(); + String userProperties = offlineMessage.getUserProperties(); + try { + RetainMessageEntity offlineMessageEntity = RetainMessageEntity.builder() + .topic(topic) + .qos(qos) + .body(body) + .userProperties(userProperties) + .createTime(new Date()) + .build(); + + RSetCache set = redissonClient.getSetCache(BootstrapKey.Redis.REDIS_OFFLINE_MESSAGE_PREFIX_KEY + topic); + set.add(offlineMessageEntity, 1, TimeUnit.DAYS); + } catch (Exception e) { + log.error("saveOfflineMessage error message: {}", topic, e); + } + } + } diff --git a/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java b/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java index 2b28c1e7300118dbe560a73bdef6a1f23472eddc..f320d965bc38d88abde60ed0c62d20f9386c815e 100644 --- a/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java +++ b/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java @@ -47,7 +47,7 @@ public class TopicRuleNode implements RuleNode { log.info("rule engine TopicRuleNode request {}", heapMqttMessage); ProtocolAdaptor protocolAdaptor = receiveContext.getProtocolAdaptor(); protocolAdaptor.chooseProtocol(MockMqttChannel.wrapClientIdentifier(heapMqttMessage.getClientIdentifier()), - new SmqttMessage<>(getMqttMessage(heapMqttMessage),heapMqttMessage.getTimestamp(),Boolean.TRUE), receiveContext); + new SmqttMessage<>(getMqttMessage(heapMqttMessage), heapMqttMessage.getTimestamp(), Boolean.TRUE), receiveContext); executeNext(contextView); } @@ -58,7 +58,8 @@ public class TopicRuleNode implements RuleNode { MqttQoS.valueOf(heapMqttMessage.getQos()), 0, this.topic, - PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage())); + PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage()), + heapMqttMessage.getProperties()); }