diff --git a/README.md b/README.md
index a4b2823d58c6576ba14adb133274d1b85c591253..ab66c11de6b93fff8b21b9c257d3bef7d8e40aec 100644
--- a/README.md
+++ b/README.md
@@ -49,11 +49,11 @@ SMQTT基于reactor-netty(spring-webflux底层依赖)开发,底层采用Reactor
> 大家不要恶意链接,谢谢!
-| 管理 | 说明 | 其他 |
-| ---- | ---- |---- |
-| 123.57.69.210:1883 | mqtt端口 |用户名:smqtt 密码:smqtt |
-| 123.57.69.210:8999 | mqtt over websocket |用户名:smqtt 密码:smqtt |
-| http://123.57.69.210:60000/smqtt/admin | 管理后台 |用户名:smqtt 密码:smqtt |
+| 管理 | 说明 | 其他 |
+|----------------------------------------| ---- |---- |
+| 113.90.145.99:18886 | mqtt端口 |用户名:smqtt 密码:smqtt |
+| 113.90.145.99:18888 | mqtt over websocket |用户名:smqtt 密码:smqtt |
+| http://113.90.145.99:18887/smqtt/admin | 管理后台 |用户名:smqtt 密码:smqtt |
## 启动方式
@@ -216,7 +216,20 @@ docker run -it -v <配置文件路径目录>:/conf -p 1883:1883 -p 1999:1999 1
[config.yaml](config/config.yaml)
-5. 启动springboot服务服务即可
+4. 启动springboot服务服务即可
+5. 如果引入的是spring-boot-starter-parent的管理包,如果启动报错,则需要添加以下依赖
+```xml
+
+ io.projectreactor
+ reactor-core
+ 3.4.9
+
+
+ io.projectreactor.netty
+ reactor-netty
+ 1.0.10
+
+```
## 官网地址
diff --git a/config/config.yaml b/config/config.yaml
index 3823b0480e4d925481cdaec0a8d1c1ccfeaa4415..ef1618b3e443e8712cbb63e156fd28a594d4378b 100644
--- a/config/config.yaml
+++ b/config/config.yaml
@@ -1,6 +1,7 @@
smqtt:
logLevel: INFO # 系统日志
tcp: # tcp配置
+ connectModel: UNIQUE # UNIQUE 唯一 KICK 互踢
port: 1883 # mqtt端口号
username: smqtt # mqtt连接默认用户名 生产环境建议spi去注入PasswordAuthentication接口
password: smqtt # mqtt连接默认密码 生产环境建议spi去注入PasswordAuthentication接口
@@ -21,7 +22,7 @@ smqtt:
key: /user/server.key # 指定ssl文件 默认系统生成
crt: /user/server.crt # 指定ssl文件 默认系统生成
http: # http相关配置 端口固定60000
- enable: true # 开关
+ enable: false # 开关
accessLog: true # http访问日志
ssl: # ssl配置
enable: false
@@ -30,7 +31,7 @@ smqtt:
username: smqtt # 访问用户名
password: smqtt # 访问密码
ws: # websocket配置
- enable: true # 开关
+ enable: false # 开关
port: 8999 # 端口
path: /mqtt # ws 的访问path mqtt.js请设置此选项
cluster: # 集群配置
diff --git a/pom.xml b/pom.xml
index 24644d0fb6327e684c6ae0c390b547925e2dabd8..2cfcc5b2dc6d43fe6574c489dd1bfdd71e570d52 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
4.0.0
io.github.quickmsg
smqtt
- 1.1.0
+ 1.1.1
smqtt-common
smqtt-core
diff --git a/smqtt-bootstrap/pom.xml b/smqtt-bootstrap/pom.xml
index 7128c5607359fdfeb93c497a769b286d56f8c7f3..9515123b701b5c52ecd8d3872c731f89aff90d5c 100644
--- a/smqtt-bootstrap/pom.xml
+++ b/smqtt-bootstrap/pom.xml
@@ -7,10 +7,10 @@
smqtt
io.github.quickmsg
- 1.1.0
+ 1.1.1
smqtt-bootstrap
- 1.1.0
+ 1.1.1
smqtt-bootstrap
http://www.example.com
@@ -45,17 +45,17 @@
io.github.quickmsg
smqtt-core
- 1.1.0
+ 1.1.1
smqtt-registry-scube
io.github.quickmsg
- 1.1.0
+ 1.1.1
smqtt-ui
io.github.quickmsg
- 1.1.0
+ 1.1.1
diff --git a/smqtt-bootstrap/src/test/java/topic/TopicTest.java b/smqtt-bootstrap/src/test/java/topic/TopicTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..d67f144ae17d85e0859923faf7a3bdae6390556d
--- /dev/null
+++ b/smqtt-bootstrap/src/test/java/topic/TopicTest.java
@@ -0,0 +1,53 @@
+package topic;
+
+import io.github.quickmsg.common.channel.MqttChannel;
+import io.github.quickmsg.common.topic.SubscribeTopic;
+import io.github.quickmsg.common.topic.TopicRegistry;
+import io.github.quickmsg.core.spi.DefaultTopicRegistry;
+import io.netty.handler.codec.mqtt.MqttQoS;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.*;
+
+/**
+ * @author luxurong
+ */
+public class TopicTest {
+
+ static ExecutorService service = Executors.newFixedThreadPool(100);
+
+ private static TopicRegistry topicRegistry = new DefaultTopicRegistry();
+
+ private static Map channelMap = new ConcurrentHashMap<>();
+
+ public static void main(String[] args) {
+ CountDownLatch count = new CountDownLatch(500000);
+ for(int i=0;i<500000;i++){
+ service.execute(()->{
+ int index = new Random().nextInt(1000);
+ MqttChannel mqttChannel =channelMap.computeIfAbsent(index,in->{
+ MqttChannel mqttChannel1=new MqttChannel();
+ mqttChannel1.setTopics(new CopyOnWriteArraySet<>());
+ return mqttChannel1;
+ });
+ SubscribeTopic subscribeTopic=new SubscribeTopic(String.valueOf(index), MqttQoS.AT_MOST_ONCE,mqttChannel);
+ topicRegistry.registrySubscribeTopic(subscribeTopic);
+ topicRegistry.getAllTopics();
+ count.countDown();
+ });
+ }
+ try {
+ count.await();
+ Map> topics = topicRegistry.getAllTopics();
+ System.out.println(topics);
+
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+
+}
diff --git a/smqtt-common/pom.xml b/smqtt-common/pom.xml
index d00dca93b284cf4467ba32dc420de3d9bd745efd..57e3ca3a7273a588a0328c2963cceb3b00cdd24c 100644
--- a/smqtt-common/pom.xml
+++ b/smqtt-common/pom.xml
@@ -5,7 +5,7 @@
smqtt
io.github.quickmsg
- 1.1.0
+ 1.1.1
4.0.0
jar
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java
new file mode 100644
index 0000000000000000000000000000000000000000..63c5fa5a4898cb9656561a8da23e429c5c998061
--- /dev/null
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java
@@ -0,0 +1,79 @@
+package io.github.quickmsg.common.ack;
+
+import io.netty.util.Timeout;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/**
+ * @author luxurong
+ */
+@Slf4j
+public abstract class AbsAck implements Ack {
+
+ private final int maxRetrySize;
+
+ private int count = 1;
+
+ private volatile boolean died = false;
+
+ private final Runnable runnable;
+
+ private final AckManager ackManager;
+
+ private final int period;
+
+
+ private final Runnable cleaner;
+
+
+
+ protected AbsAck(int maxRetrySize, int period, Runnable runnable, AckManager ackManager, Runnable cleaner) {
+ this.maxRetrySize = maxRetrySize;
+ this.period = period;
+ this.runnable = runnable;
+ this.ackManager = ackManager;
+ this.cleaner= cleaner;
+ }
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ if (++count <= maxRetrySize+1 && !died ) {
+ try {
+ log.info("task retry send ...........");
+ runnable.run();
+ ackManager.addAck(this);
+ } catch (Exception e) {
+ log.error("Ack error ", e);
+ }
+
+ }
+ else {
+ cleaner.run();
+ }
+ }
+
+ @Override
+ public void stop() {
+ died = true;
+ log.info("retry task stop ...........");
+ ackManager.deleteAck(getId());
+ }
+
+
+ @Override
+ public void start() {
+ this.ackManager.addAck(this);
+ }
+
+ @Override
+ public int getTimed() {
+ return this.period * this.count;
+ }
+
+ @Override
+ public TimeUnit getTimeUnit() {
+ return TimeUnit.SECONDS;
+ }
+}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/Ack.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/Ack.java
new file mode 100644
index 0000000000000000000000000000000000000000..822157c091bf697c3a31a2f9bd90d19e221bef96
--- /dev/null
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/Ack.java
@@ -0,0 +1,23 @@
+package io.github.quickmsg.common.ack;
+
+import io.netty.util.TimerTask;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author luxurong
+ */
+public interface Ack extends TimerTask {
+
+ int getTimed();
+
+ TimeUnit getTimeUnit();
+
+ long getId();
+
+ void start();
+
+ void stop();
+
+
+}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AckManager.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AckManager.java
new file mode 100644
index 0000000000000000000000000000000000000000..12a5f956f229c7c8520022cd8549ec1b9935b419
--- /dev/null
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AckManager.java
@@ -0,0 +1,16 @@
+package io.github.quickmsg.common.ack;
+
+/**
+ * @author luxurong
+ */
+public interface AckManager {
+
+ void addAck(Ack ack);
+
+ Ack getAck(Long id);
+
+ void deleteAck(Long id);
+
+
+
+}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/RetryAck.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/RetryAck.java
new file mode 100644
index 0000000000000000000000000000000000000000..ef4ca1c802e43ca61b86eb5ffeb50733afea3acc
--- /dev/null
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/RetryAck.java
@@ -0,0 +1,21 @@
+package io.github.quickmsg.common.ack;
+
+/**
+ * @author luxurong
+ */
+
+public class RetryAck extends AbsAck {
+
+ private final long id;
+
+
+ public RetryAck(long id, int maxRetrySize, int period, Runnable runnable, AckManager ackManager, Runnable consumer) {
+ super(maxRetrySize, period, runnable, ackManager,consumer);
+ this.id = id;
+ }
+
+ @Override
+ public long getId() {
+ return id;
+ }
+}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/TimeAckManager.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/TimeAckManager.java
new file mode 100644
index 0000000000000000000000000000000000000000..c1739a5d667f86c1eb0ee07d54fe0910220e0dcb
--- /dev/null
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/TimeAckManager.java
@@ -0,0 +1,36 @@
+package io.github.quickmsg.common.ack;
+
+import io.netty.util.HashedWheelTimer;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author luxurong
+ */
+public class TimeAckManager extends HashedWheelTimer implements AckManager {
+
+ private final Map ackMap = new ConcurrentHashMap<>();
+
+ public TimeAckManager(long tickDuration, TimeUnit unit, int ticksPerWheel) {
+ super( tickDuration, unit, ticksPerWheel);
+ }
+
+ @Override
+ public void addAck(Ack ack) {
+ ackMap.put(ack.getId(),ack);
+ this.newTimeout(ack,ack.getTimed(),ack.getTimeUnit());
+ }
+
+ @Override
+ public Ack getAck(Long id) {
+ return ackMap.get(id);
+ }
+
+ @Override
+ public void deleteAck(Long id) {
+ ackMap.remove(id);
+ }
+
+}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java b/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java
index edc8c28c694a6180ea9160a5a41e1fa9fe85b253..aa8f7410fe918bcd118af3086de92dd4869548d6 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java
@@ -1,6 +1,9 @@
package io.github.quickmsg.common.channel;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import io.github.quickmsg.common.ack.Ack;
+import io.github.quickmsg.common.ack.RetryAck;
+import io.github.quickmsg.common.ack.TimeAckManager;
import io.github.quickmsg.common.enums.ChannelStatus;
import io.github.quickmsg.common.topic.SubscribeTopic;
import io.github.quickmsg.common.utils.MessageUtils;
@@ -74,6 +77,8 @@ public class MqttChannel {
private Disposable closeDisposable;
+ private TimeAckManager timeAckManager;
+
public void disposableClose() {
if (closeDisposable != null && !closeDisposable.isDisposed()) {
closeDisposable.dispose();
@@ -86,7 +91,7 @@ public class MqttChannel {
}
- public static MqttChannel init(Connection connection) {
+ public static MqttChannel init(Connection connection, TimeAckManager timeAckManager) {
MqttChannel mqttChannel = new MqttChannel();
mqttChannel.setTopics(new CopyOnWriteArraySet<>());
mqttChannel.setAtomicInteger(new AtomicInteger(0));
@@ -97,6 +102,7 @@ public class MqttChannel {
mqttChannel.setConnection(connection);
mqttChannel.setStatus(ChannelStatus.INIT);
mqttChannel.setAddress(connection.address().toString());
+ mqttChannel.setTimeAckManager(timeAckManager);
return mqttChannel;
}
@@ -179,6 +185,11 @@ public class MqttChannel {
}
+ public long generateId(MqttMessageType type, Integer messageId) {
+ return (long) connection.channel().hashCode() << 5 | (long) type.value() << 4 | messageId;
+ }
+
+
/**
* 写入消息
*
@@ -188,10 +199,10 @@ public class MqttChannel {
*/
public Mono write(MqttMessage mqttMessage, boolean retry) {
// http本地mock
- if (this.getIsMock()) {
+ if (this.getIsMock() && !this.active()) {
return Mono.empty();
} else {
- return MqttMessageSink.MQTT_SINK.sendMessage(mqttMessage, this, retry, replyMqttMessageMap);
+ return MqttMessageSink.MQTT_SINK.sendMessage(mqttMessage, this, retry);
}
}
@@ -249,7 +260,7 @@ public class MqttChannel {
public static MqttMessageSink MQTT_SINK = new MqttMessageSink();
- public Mono sendMessage(MqttMessage mqttMessage, MqttChannel mqttChannel, boolean retry, Map> replyMqttMessageMap) {
+ public Mono sendMessage(MqttMessage mqttMessage, MqttChannel mqttChannel, boolean retry) {
if (log.isDebugEnabled()) {
log.debug("write channel {} message {}", mqttChannel.getConnection(), mqttMessage);
}
@@ -258,7 +269,14 @@ public class MqttChannel {
Increase the reference count of bytebuf, and the reference count of retrybytebuf is 2
mqttChannel.write() method releases a reference count.
*/
- return mqttChannel.write(Mono.just(mqttMessage)).then(offerReply(getReplyMqttMessage(mqttMessage), mqttChannel, getMessageId(mqttMessage), replyMqttMessageMap));
+ MqttMessage reply = getReplyMqttMessage(mqttMessage);
+
+ Runnable runnable = () -> mqttChannel.write(Mono.just(reply)).subscribe();
+ Runnable cleaner = () -> MessageUtils.safeRelease(reply);;
+ Ack ack = new RetryAck(mqttChannel.generateId(reply.fixedHeader().messageType(), getMessageId(reply)),
+ 5, 5, runnable, mqttChannel.getTimeAckManager(), cleaner);
+ ack.start();
+ return mqttChannel.write(Mono.just(mqttMessage)).then();
} else {
return mqttChannel.write(Mono.just(mqttMessage));
}
@@ -282,7 +300,6 @@ public class MqttChannel {
} else {
return mqttMessage;
}
-
}
@@ -311,21 +328,28 @@ public class MqttChannel {
}
- /**
- * Set resend action
- *
- * @param message {@link MqttMessage}
- * @param mqttChannel {@link MqttChannel}
- * @param messageId messageId
- * @param replyMqttMessageMap 重试缓存
- * @return 空操作符
- */
- public Mono offerReply(MqttMessage message, final MqttChannel mqttChannel, final int messageId, Map> replyMqttMessageMap) {
- return Mono.fromRunnable(() -> replyMqttMessageMap.computeIfAbsent(message.fixedHeader().messageType(), mqttMessageType -> new ConcurrentHashMap<>(8)).put(messageId, mqttChannel.write(Mono.fromCallable(() -> getDupMessage(message))).delaySubscription(Duration.ofSeconds(5)).repeat(10, mqttChannel::isActive).doOnError(error -> {
- MessageUtils.safeRelease(message);
- log.error("offerReply", error);
- }).doOnCancel(() -> MessageUtils.safeRelease(message)).subscribe()));
- }
+// /**
+// * Set resend action
+// *
+// * @param message {@link MqttMessage}
+// * @param mqttChannel {@link MqttChannel}
+// * @param messageId messageId
+// * @param replyMqttMessageMap 重试缓存
+// * @return 空操作符
+// */
+// public Mono offerReply(MqttMessage message, final MqttChannel mqttChannel, final int messageId, Map> replyMqttMessageMap) {
+// return Mono.fromRunnable(() ->
+// replyMqttMessageMap.computeIfAbsent(message.fixedHeader().messageType(), mqttMessageType -> new ConcurrentHashMap<>(8)).put(messageId,
+// mqttChannel.write(Mono.fromCallable(() -> getDupMessage(message)))
+// .delaySubscription(Duration.ofSeconds(5))
+// .repeat(10,mqttChannel::isActive)
+// .doOnError(error -> {
+// MessageUtils.safeRelease(message);
+// log.error("offerReply", error);
+// })
+// .doOnCancel(() -> MessageUtils.safeRelease(message))
+// .subscribe()));
+// }
}
@@ -333,4 +357,5 @@ public class MqttChannel {
public String toString() {
return "MqttChannel{" + " address='" + this.connection.address().toString() + '\'' + ", clientIdentifier='" + clientIdentifier + '\'' + ", status=" + status + ", keepalive=" + keepalive + ", username='" + username + '}';
}
+
}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java
index 806bb646d1c23b7fd0b17b495ddc5d3cea1ac107..7bb7ccfe187b9fc9f1363700c90fbfa0d43e8a79 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java
@@ -2,6 +2,7 @@ package io.github.quickmsg.common.config;
import ch.qos.logback.classic.Level;
import com.fasterxml.jackson.annotation.JsonProperty;
+import io.github.quickmsg.common.auth.PasswordAuthentication;
import io.github.quickmsg.common.metric.MeterType;
import io.github.quickmsg.common.rule.RuleChainDefinition;
import io.github.quickmsg.common.rule.source.SourceDefinition;
@@ -110,6 +111,9 @@ public class BootstrapConfig {
@NoArgsConstructor
@AllArgsConstructor
public static class TcpConfig {
+
+ @Builder.Default
+ private ConnectModel connectModel = ConnectModel.UNIQUE;
/**
* 端口
*/
@@ -186,6 +190,11 @@ public class BootstrapConfig {
*/
Map childOptions;
+ /**
+ * PasswordAuthentication
+ */
+ PasswordAuthentication authentication;
+
}
@Data
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java
index a7a277255c5ebe28719cdb3e4202cc427561dd5d..ca994d2b793e07445cb35b8d9d21f7d34250ec6e 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java
@@ -5,6 +5,8 @@ package io.github.quickmsg.common.config;
*/
public interface Configuration {
+ ConnectModel getConnectModel();
+
/**
* netty boss线程数
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/ConnectModel.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/ConnectModel.java
new file mode 100644
index 0000000000000000000000000000000000000000..2a703fde6ccb48e3b8ccc108aecb3e155b94853b
--- /dev/null
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/ConnectModel.java
@@ -0,0 +1,9 @@
+package io.github.quickmsg.common.config;
+
+/**
+ * @author luxurong
+ */
+public enum ConnectModel {
+ UNIQUE,
+ KICK
+}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/context/ReceiveContext.java b/smqtt-common/src/main/java/io/github/quickmsg/common/context/ReceiveContext.java
index e351c1974cb69952896e0257bef23fa63c2f70b1..21a062f5354786f22c137889d15dafbacdf197af 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/context/ReceiveContext.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/context/ReceiveContext.java
@@ -1,5 +1,6 @@
package io.github.quickmsg.common.context;
+import io.github.quickmsg.common.ack.TimeAckManager;
import io.github.quickmsg.common.channel.ChannelRegistry;
import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.channel.traffic.TrafficHandlerLoader;
@@ -99,5 +100,13 @@ public interface ReceiveContext extends BiConsumer extends HashMap {
+
+ public JsonMap() {
+ super();
+ }
+
+
+ public JsonMap(int initialCapacity) {
+ super(initialCapacity);
+ }
+
+ @Override
+ public String toString() {
+ return JacksonUtil.map2Json(this);
+ }
+}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/JacksonUtil.java b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/JacksonUtil.java
index cb8a5c34d3d0fd0557c56881f0ec547611f7a40f..3ba292d3c2d38cdd51d57aa9843c9a7a1256fd99 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/JacksonUtil.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/JacksonUtil.java
@@ -27,6 +27,7 @@ public class JacksonUtil {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true);
+
}
public static String bean2Json(Object data) {
diff --git a/smqtt-core/pom.xml b/smqtt-core/pom.xml
index fc09429612fee2721b1239ad605ed55b4fe486d9..779e030fa08ac884a11f652137b704980bbaedb5 100644
--- a/smqtt-core/pom.xml
+++ b/smqtt-core/pom.xml
@@ -5,7 +5,7 @@
smqtt
io.github.quickmsg
- 1.1.0
+ 1.1.1
4.0.0
smqtt-core
@@ -14,22 +14,22 @@
io.github.quickmsg
smqtt-common
- 1.1.0
+ 1.1.1
io.github.quickmsg
smqtt-rule-dsl
- 1.1.0
+ 1.1.1
io.github.quickmsg
smqtt-metric-influxdb
- 1.1.0
+ 1.1.1
io.github.quickmsg
smqtt-metric-prometheus
- 1.1.0
+ 1.1.1
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java
index 00e2047a7306c4c923f73bb745e71967b9a15736..304178d8fe04715674288d46163d9c0fa60a7bbf 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java
@@ -5,7 +5,6 @@ import io.github.quickmsg.common.config.BootstrapConfig;
import io.github.quickmsg.common.config.SslContext;
import io.github.quickmsg.common.rule.RuleChainDefinition;
import io.github.quickmsg.common.rule.source.SourceDefinition;
-import io.github.quickmsg.common.spi.DynamicLoader;
import io.github.quickmsg.common.transport.Transport;
import io.github.quickmsg.common.utils.BannerUtils;
import io.github.quickmsg.common.utils.LoggerLevel;
@@ -68,9 +67,16 @@ public class Bootstrap {
private MqttConfiguration initMqttConfiguration() {
MqttConfiguration mqttConfiguration = defaultConfiguration();
- if (tcpConfig.getUsername() != null || tcpConfig.getPassword() != null) {
- mqttConfiguration.setReactivePasswordAuth((user, pwd, id) -> user.equals(tcpConfig.getUsername()) && new String(pwd).equals(tcpConfig.getPassword()));
+ if (tcpConfig.getAuthentication() != null) {
+ mqttConfiguration.setReactivePasswordAuth(tcpConfig.getAuthentication());
+ } else {
+ if (tcpConfig.getUsername() != null || tcpConfig.getPassword() != null) {
+ mqttConfiguration.setReactivePasswordAuth((user, pwd, id) -> user.equals(tcpConfig.getUsername()) && new String(pwd).equals(tcpConfig.getPassword()));
+ } else {
+ mqttConfiguration.setReactivePasswordAuth((user, pwd, id) -> true);
+ }
}
+ Optional.ofNullable(tcpConfig.getConnectModel()).ifPresent(mqttConfiguration::setConnectModel);
Optional.ofNullable(tcpConfig.getPort()).ifPresent(mqttConfiguration::setPort);
Optional.ofNullable(tcpConfig.getLowWaterMark()).ifPresent(mqttConfiguration::setLowWaterMark);
Optional.ofNullable(tcpConfig.getHighWaterMark()).ifPresent(mqttConfiguration::setHighWaterMark);
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java b/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java
index 9ded36e263127f3bf04b924dc04640e527f28bd8..12fc2b003a703d0bb4a7593d0ae6078fcd258284 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java
@@ -2,6 +2,7 @@ package io.github.quickmsg.core.http;
import io.github.quickmsg.common.config.BootstrapConfig;
import io.github.quickmsg.common.config.Configuration;
+import io.github.quickmsg.common.config.ConnectModel;
import io.github.quickmsg.common.config.SslContext;
import lombok.Data;
@@ -37,6 +38,11 @@ public class HttpConfiguration implements Configuration {
private BootstrapConfig.MeterConfig meterConfig;
+ @Override
+ public ConnectModel getConnectModel() {
+ return null;
+ }
+
@Override
public Integer getBusinessThreadSize() {
return 0;
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java
index b95676590c6d62a83db505ace81a5eb3225731b0..f6f385179dbf3b890703f2ac852c79bd8f44982b 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java
@@ -1,5 +1,6 @@
package io.github.quickmsg.core.mqtt;
+import io.github.quickmsg.common.ack.TimeAckManager;
import io.github.quickmsg.common.auth.PasswordAuthentication;
import io.github.quickmsg.common.channel.ChannelRegistry;
import io.github.quickmsg.common.channel.traffic.TrafficHandlerLoader;
@@ -39,6 +40,7 @@ import reactor.core.scheduler.Schedulers;
import reactor.netty.resources.LoopResources;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
/**
* @author luxurong
@@ -74,6 +76,8 @@ public abstract class AbstractReceiveContext implements
private final TrafficHandlerLoader trafficHandlerLoader;
+ private final TimeAckManager timeAckManager;
+
public AbstractReceiveContext(T configuration, Transport transport) {
AbstractConfiguration abstractConfiguration = castConfiguration(configuration);
@@ -94,6 +98,7 @@ public abstract class AbstractReceiveContext implements
this.messageRegistry.startUp(abstractConfiguration.getEnvironmentMap());
this.metricManager = metricManager(abstractConfiguration.getMeterConfig());
Optional.ofNullable(abstractConfiguration.getSourceDefinitions()).ifPresent(sourceDefinitions -> sourceDefinitions.forEach(SourceManager::loadSource));
+ this.timeAckManager = new TimeAckManager(20, TimeUnit.MILLISECONDS,512);
}
private TrafficHandlerLoader trafficHandlerLoader() {
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java
index 1cd1552c90a11428c44a707f40122f8b3ad5905d..0ac9299cd9fe96d57d3f9445be00c83fc317b7f3 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java
@@ -3,6 +3,7 @@ package io.github.quickmsg.core.mqtt;
import io.github.quickmsg.common.auth.PasswordAuthentication;
import io.github.quickmsg.common.config.AbstractConfiguration;
import io.github.quickmsg.common.config.BootstrapConfig;
+import io.github.quickmsg.common.config.ConnectModel;
import io.github.quickmsg.common.config.SslContext;
import io.github.quickmsg.common.rule.RuleChainDefinition;
import io.github.quickmsg.common.rule.source.SourceDefinition;
@@ -34,6 +35,8 @@ public class MqttConfiguration extends AbstractSslHandler implements AbstractCon
private SslContext sslContext;
+ private ConnectModel connectModel;
+
private PasswordAuthentication reactivePasswordAuth = (u, p, c) -> true;
private Integer bossThreadSize = Runtime.getRuntime().availableProcessors();
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiver.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiver.java
index 49d07140ce49c04b5a699fcadbeefbe3af537ac7..f302dae0c64dd2287003b8c0d7873eef65896bf7 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiver.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiver.java
@@ -44,7 +44,7 @@ public class MqttReceiver extends AbstractSslHandler implements Receiver {
.addHandler(MqttEncoder.INSTANCE)
.addHandler(new MqttDecoder(mqttConfiguration.getMessageMaxSize()))
.addHandler(receiveContext.getTrafficHandlerLoader().get());
- receiveContext.apply(MqttChannel.init(connection));
+ receiveContext.apply(MqttChannel.init(connection,receiveContext.getTimeAckManager()));
});
}
}
\ No newline at end of file
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/CommonProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/CommonProtocol.java
index a092240f8852027a36089bad74682f838da71fb0..237135ab76e4af7a6dd907c6e54321cd12fdf079 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/CommonProtocol.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/CommonProtocol.java
@@ -1,5 +1,6 @@
package io.github.quickmsg.core.protocol;
+import io.github.quickmsg.common.ack.Ack;
import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.context.ReceiveContext;
import io.github.quickmsg.common.enums.ChannelStatus;
@@ -24,6 +25,7 @@ import reactor.util.context.ContextView;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -48,6 +50,7 @@ public class CommonProtocol implements Protocol {
@Override
public Mono parseProtocol(SmqttMessage smqttMessage, MqttChannel mqttChannel, ContextView contextView) {
+ ReceiveContext> receiveContext = contextView.get(ReceiveContext.class);
MqttMessage message = smqttMessage.getMessage();
switch (message.fixedHeader().messageType()) {
case PINGREQ:
@@ -64,8 +67,10 @@ public class CommonProtocol implements Protocol {
case PUBREC:
MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
int messageId = messageIdVariableHeader.messageId();
- return mqttChannel.cancelRetry(MqttMessageType.PUBLISH, messageId)
- .then(mqttChannel.write(MqttMessageBuilder.buildPublishRel(messageId), true));
+ return Mono.fromRunnable(() -> {
+ Optional.ofNullable(receiveContext.getTimeAckManager().getAck(mqttChannel.generateId(MqttMessageType.PUBLISH, messageId)))
+ .ifPresent(Ack::stop);
+ }).then(mqttChannel.write(MqttMessageBuilder.buildPublishRel(messageId), true));
case PUBREL:
MqttMessageIdVariableHeader relMessageIdVariableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
int id = relMessageIdVariableHeader.messageId();
@@ -76,25 +81,30 @@ public class CommonProtocol implements Protocol {
*/
return mqttChannel.removeQos2Msg(id)
.map(msg -> {
- ReceiveContext> receiveContext = contextView.get(ReceiveContext.class);
TopicRegistry topicRegistry = receiveContext.getTopicRegistry();
MessageRegistry messageRegistry = receiveContext.getMessageRegistry();
Set subscribeTopics = topicRegistry.getSubscribesByTopic(msg.variableHeader().topicName(), msg.fixedHeader().qosLevel());
return Mono.when(
- subscribeTopics.stream()
- .filter(subscribeTopic -> filterOfflineSession(subscribeTopic.getMqttChannel(), messageRegistry, MessageUtils.wrapPublishMessage(msg, subscribeTopic.getQoS(), subscribeTopic.getMqttChannel().generateMessageId())))
- .map(subscribeTopic -> subscribeTopic.getMqttChannel()
- .write(MessageUtils.wrapPublishMessage(msg, subscribeTopic.getQoS(),
- subscribeTopic.getMqttChannel().generateMessageId()), subscribeTopic.getQoS().value() > 0)
- ).collect(Collectors.toList()))
- .then(mqttChannel.cancelRetry(MqttMessageType.PUBREC, id))
+ subscribeTopics.stream()
+ .filter(subscribeTopic -> filterOfflineSession(subscribeTopic.getMqttChannel(), messageRegistry, MessageUtils.wrapPublishMessage(msg, subscribeTopic.getQoS(), subscribeTopic.getMqttChannel().generateMessageId())))
+ .map(subscribeTopic -> subscribeTopic.getMqttChannel()
+ .write(MessageUtils.wrapPublishMessage(msg, subscribeTopic.getQoS(),
+ subscribeTopic.getMqttChannel().generateMessageId()), subscribeTopic.getQoS().value() > 0)
+ ).collect(Collectors.toList()))
+ .then(Mono.fromRunnable(() -> {
+ Optional.ofNullable(receiveContext.getTimeAckManager().getAck(mqttChannel.generateId(MqttMessageType.PUBREC, id)))
+ .ifPresent(Ack::stop);
+ }))
.then(mqttChannel.write(MqttMessageBuilder.buildPublishComp(id), false));
}).orElseGet(() -> mqttChannel.write(MqttMessageBuilder.buildPublishComp(id), false));
case PUBCOMP:
MqttMessageIdVariableHeader messageIdVariableHeader1 = (MqttMessageIdVariableHeader) message.variableHeader();
int compId = messageIdVariableHeader1.messageId();
- return mqttChannel.cancelRetry(MqttMessageType.PUBREL, compId);
+ return Mono.fromRunnable(() -> {
+ Optional.ofNullable(receiveContext.getTimeAckManager().getAck(mqttChannel.generateId(MqttMessageType.PUBREL, compId)))
+ .ifPresent(Ack::stop);
+ });
case PINGRESP:
default:
return Mono.empty();
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java
index cf17efd5d73740bcb081cd07114e2a379019257f..6d17c31a495baa1cf0555bf66f4c09ef2faec291 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java
@@ -3,6 +3,7 @@ package io.github.quickmsg.core.protocol;
import io.github.quickmsg.common.auth.PasswordAuthentication;
import io.github.quickmsg.common.channel.ChannelRegistry;
import io.github.quickmsg.common.channel.MqttChannel;
+import io.github.quickmsg.common.config.ConnectModel;
import io.github.quickmsg.common.context.ReceiveContext;
import io.github.quickmsg.common.enums.ChannelStatus;
import io.github.quickmsg.common.enums.Event;
@@ -64,10 +65,15 @@ public class ConnectProtocol implements Protocol {
MetricManager metricManager = mqttReceiveContext.getMetricManager();
PasswordAuthentication passwordAuthentication = mqttReceiveContext.getPasswordAuthentication();
/*check clientIdentifier exist*/
- if (channelRegistry.exists(clientIdentifier)) {
- return mqttChannel.write(
- MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED),
- false).then(mqttChannel.close());
+ if (mqttReceiveContext.getConfiguration().getConnectModel() == ConnectModel.UNIQUE) {
+ if (channelRegistry.exists(clientIdentifier)) {
+ return mqttChannel.write(
+ MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED),
+ false).then(mqttChannel.close());
+ }
+ } else {
+ Optional.ofNullable( channelRegistry.get(clientIdentifier))
+ .ifPresent(ch->ch.close().subscribe());
}
/*protocol version support*/
if (MqttVersion.MQTT_3_1_1.protocolLevel() != (byte) mqttConnectVariableHeader.version()
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishAckProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishAckProtocol.java
index cae7f290fa94099621d4a22a7feb4ff9dc8a3ae8..8d7e6b9d5245adc2caa6a7650150ea5677315186 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishAckProtocol.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishAckProtocol.java
@@ -1,6 +1,8 @@
package io.github.quickmsg.core.protocol;
+import io.github.quickmsg.common.ack.Ack;
import io.github.quickmsg.common.channel.MqttChannel;
+import io.github.quickmsg.common.context.ReceiveContext;
import io.github.quickmsg.common.message.SmqttMessage;
import io.github.quickmsg.common.protocol.Protocol;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
@@ -12,6 +14,7 @@ import reactor.util.context.ContextView;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
/**
* @author luxurong
@@ -28,10 +31,14 @@ public class PublishAckProtocol implements Protocol {
@Override
public Mono parseProtocol(SmqttMessage smqttMessage, MqttChannel mqttChannel, ContextView contextView) {
- MqttPubAckMessage message = smqttMessage.getMessage();
- MqttMessageIdVariableHeader idVariableHeader = message.variableHeader();
- int messageId = idVariableHeader.messageId();
- return mqttChannel.cancelRetry(MqttMessageType.PUBLISH,messageId);
+ return Mono.fromRunnable(()->{
+ ReceiveContext> receiveContext = contextView.get(ReceiveContext.class);
+ MqttPubAckMessage message = smqttMessage.getMessage();
+ MqttMessageIdVariableHeader idVariableHeader = message.variableHeader();
+ int messageId = idVariableHeader.messageId();
+ Optional.ofNullable(receiveContext.getTimeAckManager().getAck(mqttChannel.generateId(MqttMessageType.PUBLISH,messageId)))
+ .ifPresent(Ack::stop);
+ });
}
@Override
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java
index 7f0235b63470b14d538dbf0c55e2495c961aa350..4007bacd823fe05b123456d3c8e4c2680278319e 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java
@@ -45,11 +45,11 @@ public class DefaultProtocolAdaptor implements ProtocolAdaptor {
@Override
public void chooseProtocol(MqttChannel mqttChannel, SmqttMessage smqttMessage, ReceiveContext receiveContext) {
MqttMessage mqttMessage = smqttMessage.getMessage();
- log.info(" 【{}】【{}】 【{}】",
- Thread.currentThread().getName(),
- mqttMessage.fixedHeader().messageType(),
- mqttChannel);
if (mqttMessage.decoderResult() != null && (mqttMessage.decoderResult().isSuccess())) {
+ log.info(" 【{}】【{}】 【{}】",
+ Thread.currentThread().getName(),
+ mqttMessage.fixedHeader().messageType(),
+ mqttChannel);
Optional.ofNullable(types.get(mqttMessage.fixedHeader().messageType()))
.ifPresent(protocol -> protocol
.doParseProtocol(smqttMessage, mqttChannel)
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/websocket/WebSocketMqttReceiver.java b/smqtt-core/src/main/java/io/github/quickmsg/core/websocket/WebSocketMqttReceiver.java
index 3f7da321b4018e09812c2d39a09e1d95769bb849..5f350a3cd3c4ae2e9ea1e4da7e5efccae14975ef 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/websocket/WebSocketMqttReceiver.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/websocket/WebSocketMqttReceiver.java
@@ -53,7 +53,7 @@ public class WebSocketMqttReceiver extends AbstractSslHandler implements Receive
.addHandlerLast(new ByteBufToWebSocketFrameEncoder())
.addHandlerLast(new MqttDecoder(mqttConfiguration.getMessageMaxSize()))
.addHandlerLast(MqttEncoder.INSTANCE);
- receiveContext.apply(MqttChannel.init(connection));
+ receiveContext.apply(MqttChannel.init(connection, receiveContext.getTimeAckManager()));
});
}
diff --git a/smqtt-metric/pom.xml b/smqtt-metric/pom.xml
index 4bb32c80feb4d7a19459dae4bc025e84ca70625e..cde0fe2a88d57e4ccdc6b2090a80d4a6bac2f719 100644
--- a/smqtt-metric/pom.xml
+++ b/smqtt-metric/pom.xml
@@ -8,7 +8,7 @@
smqtt
io.github.quickmsg
- 1.1.0
+ 1.1.1
smqtt-metric
diff --git a/smqtt-metric/smqtt-metric-influxdb/pom.xml b/smqtt-metric/smqtt-metric-influxdb/pom.xml
index 74e7dc5a2ed6d1260cdd1a928d504f424be50796..c16cf20b200e9ae4c1aac2408d94d3866b606e27 100644
--- a/smqtt-metric/smqtt-metric-influxdb/pom.xml
+++ b/smqtt-metric/smqtt-metric-influxdb/pom.xml
@@ -5,7 +5,7 @@
smqtt-metric
io.github.quickmsg
- 1.1.0
+ 1.1.1
4.0.0
@@ -15,7 +15,7 @@
io.github.quickmsg
smqtt-common
- 1.1.0
+ 1.1.1
diff --git a/smqtt-metric/smqtt-metric-prometheus/pom.xml b/smqtt-metric/smqtt-metric-prometheus/pom.xml
index 3b84a9a5487f1e2dbbd6c343baff341c2641ae96..86aad8e487837764ee6e50ddc52635205d882ffa 100644
--- a/smqtt-metric/smqtt-metric-prometheus/pom.xml
+++ b/smqtt-metric/smqtt-metric-prometheus/pom.xml
@@ -5,7 +5,7 @@
smqtt-metric
io.github.quickmsg
- 1.1.0
+ 1.1.1
4.0.0
@@ -19,7 +19,7 @@
io.github.quickmsg
smqtt-common
- 1.1.0
+ 1.1.1
diff --git a/smqtt-persistent/pom.xml b/smqtt-persistent/pom.xml
index 1d30ce8da57827cff3dcd4d0d8159b2c32ea5e13..13c67c73872af57c9f41c396f42213e91cdfc87d 100644
--- a/smqtt-persistent/pom.xml
+++ b/smqtt-persistent/pom.xml
@@ -5,7 +5,7 @@
smqtt
io.github.quickmsg
- 1.1.0
+ 1.1.1
4.0.0
pom
diff --git a/smqtt-persistent/smqtt-persistent-db/pom.xml b/smqtt-persistent/smqtt-persistent-db/pom.xml
index 370e12fdd9bd8d2f7079b287149f3f3687181855..025a1325fb3109166295f50d30bbab037d42548c 100644
--- a/smqtt-persistent/smqtt-persistent-db/pom.xml
+++ b/smqtt-persistent/smqtt-persistent-db/pom.xml
@@ -5,12 +5,12 @@
smqtt-persistent
io.github.quickmsg
- 1.1.0
+ 1.1.1
4.0.0
smqtt-persistent-db
- 1.1.0
+ 1.1.1
3.14.11
@@ -20,7 +20,7 @@
io.github.quickmsg
smqtt-common
- 1.1.0
+ 1.1.1
compile
diff --git a/smqtt-persistent/smqtt-persistent-redis/pom.xml b/smqtt-persistent/smqtt-persistent-redis/pom.xml
index 022d243f2cd9c9b6e45e98eaa3277da43c274f4f..9124bac67b0fd27e9a6b2869e250d58cf66c4b1e 100644
--- a/smqtt-persistent/smqtt-persistent-redis/pom.xml
+++ b/smqtt-persistent/smqtt-persistent-redis/pom.xml
@@ -5,12 +5,12 @@
smqtt-persistent
io.github.quickmsg
- 1.1.0
+ 1.1.1
4.0.0
smqtt-persistent-redis
- 1.1.0
+ 1.1.1
3.15.6
@@ -20,7 +20,7 @@
io.github.quickmsg
smqtt-common
- 1.1.0
+ 1.1.1
compile
diff --git a/smqtt-registry/pom.xml b/smqtt-registry/pom.xml
index fcec21ed17d9554c7c7e27e92808779badf19c55..a58f471e8a895791cf4f95d7443f625f87a9df40 100644
--- a/smqtt-registry/pom.xml
+++ b/smqtt-registry/pom.xml
@@ -5,7 +5,7 @@
smqtt
io.github.quickmsg
- 1.1.0
+ 1.1.1
4.0.0
pom
diff --git a/smqtt-registry/smqtt-registry-ignite/pom.xml b/smqtt-registry/smqtt-registry-ignite/pom.xml
index 4742dca32345dcaa9a9e4b711bc56c9fce68b2f9..c35895ec9d87535c63665760223c699a365d5b1d 100644
--- a/smqtt-registry/smqtt-registry-ignite/pom.xml
+++ b/smqtt-registry/smqtt-registry-ignite/pom.xml
@@ -5,7 +5,7 @@
smqtt-registry
io.github.quickmsg
- 1.1.0
+ 1.1.1
4.0.0
diff --git a/smqtt-registry/smqtt-registry-scube/pom.xml b/smqtt-registry/smqtt-registry-scube/pom.xml
index ea2d4677cd950d0c840146d52024768e8c47469f..0dbe4f4e6dc9385b0532d6f74a3199fd170f7535 100644
--- a/smqtt-registry/smqtt-registry-scube/pom.xml
+++ b/smqtt-registry/smqtt-registry-scube/pom.xml
@@ -5,7 +5,7 @@
smqtt-registry
io.github.quickmsg
- 1.1.0
+ 1.1.1
4.0.0
smqtt-registry-scube
@@ -50,7 +50,7 @@
io.github.quickmsg
smqtt-common
- 1.1.0
+ 1.1.1
provided
diff --git a/smqtt-rule/pom.xml b/smqtt-rule/pom.xml
index da4262acbadea0d0b55a8e0a8b4289590b0b667d..a1a6380c908d9394f3a4d686f89e5d02382df5cd 100644
--- a/smqtt-rule/pom.xml
+++ b/smqtt-rule/pom.xml
@@ -7,7 +7,7 @@
smqtt
io.github.quickmsg
- 1.1.0
+ 1.1.1
smqtt-rule
diff --git a/smqtt-rule/smqtt-rule-dsl/pom.xml b/smqtt-rule/smqtt-rule-dsl/pom.xml
index 06d143ac23490f7372ebe17f57531a67798ca607..d663114c459dcf00129a337c936d09e1094674aa 100644
--- a/smqtt-rule/smqtt-rule-dsl/pom.xml
+++ b/smqtt-rule/smqtt-rule-dsl/pom.xml
@@ -5,7 +5,7 @@
smqtt-rule
io.github.quickmsg
- 1.1.0
+ 1.1.1
4.0.0
@@ -15,13 +15,13 @@
io.github.quickmsg
smqtt-common
- 1.1.0
+ 1.1.1
provided
smqtt-rule-engine
io.github.quickmsg
- 1.1.0
+ 1.1.1
diff --git a/smqtt-rule/smqtt-rule-engine/pom.xml b/smqtt-rule/smqtt-rule-engine/pom.xml
index 82bb16e9b7eca32a8f63b7d9d261055ac3e55690..fadf42c21063247f11d989ea1b2a5155a6af1a2f 100644
--- a/smqtt-rule/smqtt-rule-engine/pom.xml
+++ b/smqtt-rule/smqtt-rule-engine/pom.xml
@@ -7,7 +7,7 @@
smqtt-rule
io.github.quickmsg
- 1.1.0
+ 1.1.1
smqtt-rule-engine
@@ -18,44 +18,44 @@
io.github.quickmsg
smqtt-common
- 1.1.0
+ 1.1.1
provided
io.github.quickmsg
smqtt-rule-source-kafka
- 1.1.0
+ 1.1.1
io.github.quickmsg
smqtt-rule-source-http
- 1.1.0
+ 1.1.1
io.github.quickmsg
smqtt-rule-source-rocketmq
- 1.1.0
+ 1.1.1
io.github.quickmsg
smqtt-rule-source-rabbitmq
- 1.1.0
+ 1.1.1
io.github.quickmsg
smqtt-rule-source-db
- 1.1.0
+ 1.1.1
io.github.quickmsg
smqtt-rule-source-mqtt
- 1.1.0
+ 1.1.1
diff --git a/smqtt-rule/smqtt-rule-source/pom.xml b/smqtt-rule/smqtt-rule-source/pom.xml
index 6c46a35e39de41960187ad1953caf91830601892..1153ff46c50a759e29a7fa54a140f0a435238e66 100644
--- a/smqtt-rule/smqtt-rule-source/pom.xml
+++ b/smqtt-rule/smqtt-rule-source/pom.xml
@@ -8,7 +8,7 @@
smqtt-rule
io.github.quickmsg
- 1.1.0
+ 1.1.1
smqtt-rule-source
@@ -33,7 +33,7 @@
io.github.quickmsg
smqtt-common
- 1.1.0
+ 1.1.1
provided
diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-db/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-db/pom.xml
index 84389977c026787362d55aa6c6f405aab1ef13c9..868a99331d4f83d77e7ee21d8db755d045dc83f2 100644
--- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-db/pom.xml
+++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-db/pom.xml
@@ -5,13 +5,13 @@
smqtt-rule-source
io.github.quickmsg
- 1.1.0
+ 1.1.1
4.0.0
io.github.quickmsg
smqtt-rule-source-db
- 1.1.0
+ 1.1.1
3.14.11
diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-http/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-http/pom.xml
index 2dd03f0f6d3bea634c8ae1c2db361ed40d969fd8..8895ebb39527e34cfa3f8d906525ebc982e7739f 100644
--- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-http/pom.xml
+++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-http/pom.xml
@@ -6,7 +6,7 @@
io.github.quickmsg
smqtt-rule-source-http
- 1.1.0
+ 1.1.1
smqtt-rule-source-http
@@ -15,14 +15,14 @@
smqtt-rule-source
io.github.quickmsg
- 1.1.0
+ 1.1.1
io.github.quickmsg
smqtt-common
- 1.1.0
+ 1.1.1
provided
diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-kafka/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-kafka/pom.xml
index 243882f9fbfedd430788235bc2b7ecdd6ac1ff7a..78e6ccb4887af0febf0b1e4f7c1dd2f361a8a0cd 100644
--- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-kafka/pom.xml
+++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-kafka/pom.xml
@@ -5,12 +5,12 @@
smqtt-rule-source
io.github.quickmsg
- 1.1.0
+ 1.1.1
4.0.0
smqtt-rule-source-kafka
- 1.1.0
+ 1.1.1
https://github.com/quickmsg/smqtt
diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/pom.xml
index 197f89d70bb5017db8989bb2ead790725ee7c85f..b35e4461cd83967157cac7cbea4ad56a9a65f5ba 100644
--- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/pom.xml
+++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/pom.xml
@@ -5,13 +5,13 @@
smqtt-rule-source
io.github.quickmsg
- 1.1.0
+ 1.1.1
4.0.0
io.github.quickmsg
smqtt-rule-source-mqtt
- 1.1.0
+ 1.1.1
diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/src/main/java/io/github/quickmsg/source/mqtt/MqttSourceBean.java b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/src/main/java/io/github/quickmsg/source/mqtt/MqttSourceBean.java
index 6416db1bbb1a791edcf148b11daad6c693178952..342869400d9223a4aeddc376afe2d419f58716fa 100644
--- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/src/main/java/io/github/quickmsg/source/mqtt/MqttSourceBean.java
+++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/src/main/java/io/github/quickmsg/source/mqtt/MqttSourceBean.java
@@ -7,6 +7,7 @@ import com.hivemq.client.mqtt.mqtt3.message.connect.Mqtt3ConnectBuilder;
import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck;
import io.github.quickmsg.common.rule.source.Source;
import io.github.quickmsg.common.rule.source.SourceBean;
+import io.github.quickmsg.common.utils.JacksonUtil;
import io.netty.util.internal.StringUtil;
import lombok.extern.slf4j.Slf4j;
@@ -91,12 +92,13 @@ public class MqttSourceBean implements SourceBean {
@Override
public void transmit(Map object) {
String topic = (String) object.get("topic");
- String msg = (String) object.get("msg");
+ Object msg =object.get("msg");
+ String bytes = msg instanceof Map ? JacksonUtil.map2Json((Map extends Object, ? extends Object>) msg): msg.toString();
Boolean retain = (Boolean) object.get("retain");
Integer qos = Optional.ofNullable((Integer)object.get("qos")).orElse(0);
client.publishWith()
.topic(topic)
- .payload(msg.getBytes())
+ .payload(bytes.getBytes())
.qos(Objects.requireNonNull(MqttQos.fromCode(qos)))
.retain(retain)
.send()
diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rabbitmq/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rabbitmq/pom.xml
index a63c83a67984e0981a814f5985f444f09d15d3d6..92921f22ec40144d894ce5b16ba8f297ddfcf398 100644
--- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rabbitmq/pom.xml
+++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rabbitmq/pom.xml
@@ -5,13 +5,13 @@
smqtt-rule-source
io.github.quickmsg
- 1.1.0
+ 1.1.1
4.0.0
io.github.quickmsg
smqtt-rule-source-rabbitmq
- 1.1.0
+ 1.1.1
diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rocketmq/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rocketmq/pom.xml
index f03aa491bd250288f03ff0d590fcd72c61dddc7b..17161390653e3cb185bd398d8956d6c0904bd38d 100644
--- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rocketmq/pom.xml
+++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rocketmq/pom.xml
@@ -5,13 +5,13 @@
smqtt-rule-source
io.github.quickmsg
- 1.1.0
+ 1.1.1
4.0.0
io.github.quickmsg
smqtt-rule-source-rocketmq
- 1.1.0
+ 1.1.1
diff --git a/smqtt-spring-boot-starter/pom.xml b/smqtt-spring-boot-starter/pom.xml
index 56f05fac21621bccb5fe8386946de1a6c2908a3b..9f2a69b285bb24512354b6afd46addd16321db2a 100644
--- a/smqtt-spring-boot-starter/pom.xml
+++ b/smqtt-spring-boot-starter/pom.xml
@@ -7,7 +7,7 @@
smqtt
io.github.quickmsg
- 1.1.0
+ 1.1.1
smqtt-spring-boot-starter
@@ -34,17 +34,17 @@
io.github.quickmsg
smqtt-core
- 1.1.0
+ 1.1.1
smqtt-registry-scube
io.github.quickmsg
- 1.1.0
+ 1.1.1
smqtt-ui
io.github.quickmsg
- 1.1.0
+ 1.1.1
io.projectreactor.netty
diff --git a/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java b/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java
index 2186cce609873db5036f6a44e6f13f9598ef4dcc..60132949c90165647e253c05f617e0dabb137996 100644
--- a/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java
+++ b/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java
@@ -1,6 +1,8 @@
package io.github.quickmsg.starter;
import ch.qos.logback.classic.Level;
+import io.github.quickmsg.common.auth.PasswordAuthentication;
+import io.github.quickmsg.common.config.ConnectModel;
import io.github.quickmsg.common.utils.IPUtils;
import io.github.quickmsg.core.Bootstrap;
import lombok.extern.slf4j.Slf4j;
@@ -15,7 +17,7 @@ import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
@EnableConfigurationProperties(SpringBootstrapConfig.class)
-public class AutoMqttConfiguration {
+public class AutoMqttConfiguration {
/**
@@ -25,7 +27,8 @@ public class AutoMqttConfiguration {
* @return {@link Bootstrap}
*/
@Bean
- public Bootstrap startServer(@Autowired SpringBootstrapConfig springBootstrapConfig) {
+ public Bootstrap startServer(@Autowired SpringBootstrapConfig springBootstrapConfig, @Autowired(required = false) PasswordAuthentication authentication) {
+ check(springBootstrapConfig,authentication);
return Bootstrap.builder()
.rootLevel(Level.toLevel(springBootstrapConfig.getLogLevel()))
.tcpConfig(springBootstrapConfig.getTcp())
@@ -42,6 +45,15 @@ public class AutoMqttConfiguration {
.doOnSuccess(this::printUiUrl).block();
}
+ private void check(SpringBootstrapConfig springBootstrapConfig, PasswordAuthentication authentication) {
+ if(springBootstrapConfig.getTcp().getConnectModel() == null){
+ springBootstrapConfig.getTcp().setConnectModel(ConnectModel.UNIQUE);
+ }
+ if(authentication !=null){
+ springBootstrapConfig.getTcp().setAuthentication(authentication);
+ }
+ }
+
public void printUiUrl(Bootstrap bootstrap) {
String start = "\n-------------------------------------------------------------\n\t";
start += String.format("Smqtt mqtt connect url %s:%s \n\t", IPUtils.getIP(), bootstrap.getTcpConfig().getPort());
diff --git a/smqtt-ui/pom.xml b/smqtt-ui/pom.xml
index 545136425dae8e4e9723a0836e02640439865129..1beb638142261bdc7a0b77a36c1862cef1ea8b2e 100644
--- a/smqtt-ui/pom.xml
+++ b/smqtt-ui/pom.xml
@@ -5,7 +5,7 @@
smqtt
io.github.quickmsg
- 1.1.0
+ 1.1.1
4.0.0
smqtt-ui