diff --git a/README.md b/README.md index a4b2823d58c6576ba14adb133274d1b85c591253..ab66c11de6b93fff8b21b9c257d3bef7d8e40aec 100644 --- a/README.md +++ b/README.md @@ -49,11 +49,11 @@ SMQTT基于reactor-netty(spring-webflux底层依赖)开发,底层采用Reactor > 大家不要恶意链接,谢谢! -| 管理 | 说明 | 其他 | -| ---- | ---- |---- | -| 123.57.69.210:1883 | mqtt端口 |用户名:smqtt 密码:smqtt | -| 123.57.69.210:8999 | mqtt over websocket |用户名:smqtt 密码:smqtt | -| http://123.57.69.210:60000/smqtt/admin | 管理后台 |用户名:smqtt 密码:smqtt | +| 管理 | 说明 | 其他 | +|----------------------------------------| ---- |---- | +| 113.90.145.99:18886 | mqtt端口 |用户名:smqtt 密码:smqtt | +| 113.90.145.99:18888 | mqtt over websocket |用户名:smqtt 密码:smqtt | +| http://113.90.145.99:18887/smqtt/admin | 管理后台 |用户名:smqtt 密码:smqtt | ## 启动方式 @@ -216,7 +216,20 @@ docker run -it -v <配置文件路径目录>:/conf -p 1883:1883 -p 1999:1999 1 [config.yaml](config/config.yaml) -5. 启动springboot服务服务即可 +4. 启动springboot服务服务即可 +5. 如果引入的是spring-boot-starter-parent的管理包,如果启动报错,则需要添加以下依赖 +```xml + + io.projectreactor + reactor-core + 3.4.9 + + + io.projectreactor.netty + reactor-netty + 1.0.10 + +``` ## 官网地址 diff --git a/config/config.yaml b/config/config.yaml index 3823b0480e4d925481cdaec0a8d1c1ccfeaa4415..ef1618b3e443e8712cbb63e156fd28a594d4378b 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -1,6 +1,7 @@ smqtt: logLevel: INFO # 系统日志 tcp: # tcp配置 + connectModel: UNIQUE # UNIQUE 唯一 KICK 互踢 port: 1883 # mqtt端口号 username: smqtt # mqtt连接默认用户名 生产环境建议spi去注入PasswordAuthentication接口 password: smqtt # mqtt连接默认密码 生产环境建议spi去注入PasswordAuthentication接口 @@ -21,7 +22,7 @@ smqtt: key: /user/server.key # 指定ssl文件 默认系统生成 crt: /user/server.crt # 指定ssl文件 默认系统生成 http: # http相关配置 端口固定60000 - enable: true # 开关 + enable: false # 开关 accessLog: true # http访问日志 ssl: # ssl配置 enable: false @@ -30,7 +31,7 @@ smqtt: username: smqtt # 访问用户名 password: smqtt # 访问密码 ws: # websocket配置 - enable: true # 开关 + enable: false # 开关 port: 8999 # 端口 path: /mqtt # ws 的访问path mqtt.js请设置此选项 cluster: # 集群配置 diff --git a/pom.xml b/pom.xml index 24644d0fb6327e684c6ae0c390b547925e2dabd8..2cfcc5b2dc6d43fe6574c489dd1bfdd71e570d52 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 io.github.quickmsg smqtt - 1.1.0 + 1.1.1 smqtt-common smqtt-core diff --git a/smqtt-bootstrap/pom.xml b/smqtt-bootstrap/pom.xml index 7128c5607359fdfeb93c497a769b286d56f8c7f3..9515123b701b5c52ecd8d3872c731f89aff90d5c 100644 --- a/smqtt-bootstrap/pom.xml +++ b/smqtt-bootstrap/pom.xml @@ -7,10 +7,10 @@ smqtt io.github.quickmsg - 1.1.0 + 1.1.1 smqtt-bootstrap - 1.1.0 + 1.1.1 smqtt-bootstrap http://www.example.com @@ -45,17 +45,17 @@ io.github.quickmsg smqtt-core - 1.1.0 + 1.1.1 smqtt-registry-scube io.github.quickmsg - 1.1.0 + 1.1.1 smqtt-ui io.github.quickmsg - 1.1.0 + 1.1.1 diff --git a/smqtt-bootstrap/src/test/java/topic/TopicTest.java b/smqtt-bootstrap/src/test/java/topic/TopicTest.java new file mode 100644 index 0000000000000000000000000000000000000000..d67f144ae17d85e0859923faf7a3bdae6390556d --- /dev/null +++ b/smqtt-bootstrap/src/test/java/topic/TopicTest.java @@ -0,0 +1,53 @@ +package topic; + +import io.github.quickmsg.common.channel.MqttChannel; +import io.github.quickmsg.common.topic.SubscribeTopic; +import io.github.quickmsg.common.topic.TopicRegistry; +import io.github.quickmsg.core.spi.DefaultTopicRegistry; +import io.netty.handler.codec.mqtt.MqttQoS; + +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.*; + +/** + * @author luxurong + */ +public class TopicTest { + + static ExecutorService service = Executors.newFixedThreadPool(100); + + private static TopicRegistry topicRegistry = new DefaultTopicRegistry(); + + private static Map channelMap = new ConcurrentHashMap<>(); + + public static void main(String[] args) { + CountDownLatch count = new CountDownLatch(500000); + for(int i=0;i<500000;i++){ + service.execute(()->{ + int index = new Random().nextInt(1000); + MqttChannel mqttChannel =channelMap.computeIfAbsent(index,in->{ + MqttChannel mqttChannel1=new MqttChannel(); + mqttChannel1.setTopics(new CopyOnWriteArraySet<>()); + return mqttChannel1; + }); + SubscribeTopic subscribeTopic=new SubscribeTopic(String.valueOf(index), MqttQoS.AT_MOST_ONCE,mqttChannel); + topicRegistry.registrySubscribeTopic(subscribeTopic); + topicRegistry.getAllTopics(); + count.countDown(); + }); + } + try { + count.await(); + Map> topics = topicRegistry.getAllTopics(); + System.out.println(topics); + + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + + +} diff --git a/smqtt-common/pom.xml b/smqtt-common/pom.xml index d00dca93b284cf4467ba32dc420de3d9bd745efd..57e3ca3a7273a588a0328c2963cceb3b00cdd24c 100644 --- a/smqtt-common/pom.xml +++ b/smqtt-common/pom.xml @@ -5,7 +5,7 @@ smqtt io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 jar diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java new file mode 100644 index 0000000000000000000000000000000000000000..63c5fa5a4898cb9656561a8da23e429c5c998061 --- /dev/null +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java @@ -0,0 +1,79 @@ +package io.github.quickmsg.common.ack; + +import io.netty.util.Timeout; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * @author luxurong + */ +@Slf4j +public abstract class AbsAck implements Ack { + + private final int maxRetrySize; + + private int count = 1; + + private volatile boolean died = false; + + private final Runnable runnable; + + private final AckManager ackManager; + + private final int period; + + + private final Runnable cleaner; + + + + protected AbsAck(int maxRetrySize, int period, Runnable runnable, AckManager ackManager, Runnable cleaner) { + this.maxRetrySize = maxRetrySize; + this.period = period; + this.runnable = runnable; + this.ackManager = ackManager; + this.cleaner= cleaner; + } + + @Override + public void run(Timeout timeout) throws Exception { + if (++count <= maxRetrySize+1 && !died ) { + try { + log.info("task retry send ..........."); + runnable.run(); + ackManager.addAck(this); + } catch (Exception e) { + log.error("Ack error ", e); + } + + } + else { + cleaner.run(); + } + } + + @Override + public void stop() { + died = true; + log.info("retry task stop ..........."); + ackManager.deleteAck(getId()); + } + + + @Override + public void start() { + this.ackManager.addAck(this); + } + + @Override + public int getTimed() { + return this.period * this.count; + } + + @Override + public TimeUnit getTimeUnit() { + return TimeUnit.SECONDS; + } +} diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/Ack.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/Ack.java new file mode 100644 index 0000000000000000000000000000000000000000..822157c091bf697c3a31a2f9bd90d19e221bef96 --- /dev/null +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/Ack.java @@ -0,0 +1,23 @@ +package io.github.quickmsg.common.ack; + +import io.netty.util.TimerTask; + +import java.util.concurrent.TimeUnit; + +/** + * @author luxurong + */ +public interface Ack extends TimerTask { + + int getTimed(); + + TimeUnit getTimeUnit(); + + long getId(); + + void start(); + + void stop(); + + +} diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AckManager.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AckManager.java new file mode 100644 index 0000000000000000000000000000000000000000..12a5f956f229c7c8520022cd8549ec1b9935b419 --- /dev/null +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AckManager.java @@ -0,0 +1,16 @@ +package io.github.quickmsg.common.ack; + +/** + * @author luxurong + */ +public interface AckManager { + + void addAck(Ack ack); + + Ack getAck(Long id); + + void deleteAck(Long id); + + + +} diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/RetryAck.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/RetryAck.java new file mode 100644 index 0000000000000000000000000000000000000000..ef4ca1c802e43ca61b86eb5ffeb50733afea3acc --- /dev/null +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/RetryAck.java @@ -0,0 +1,21 @@ +package io.github.quickmsg.common.ack; + +/** + * @author luxurong + */ + +public class RetryAck extends AbsAck { + + private final long id; + + + public RetryAck(long id, int maxRetrySize, int period, Runnable runnable, AckManager ackManager, Runnable consumer) { + super(maxRetrySize, period, runnable, ackManager,consumer); + this.id = id; + } + + @Override + public long getId() { + return id; + } +} diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/TimeAckManager.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/TimeAckManager.java new file mode 100644 index 0000000000000000000000000000000000000000..c1739a5d667f86c1eb0ee07d54fe0910220e0dcb --- /dev/null +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/TimeAckManager.java @@ -0,0 +1,36 @@ +package io.github.quickmsg.common.ack; + +import io.netty.util.HashedWheelTimer; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +/** + * @author luxurong + */ +public class TimeAckManager extends HashedWheelTimer implements AckManager { + + private final Map ackMap = new ConcurrentHashMap<>(); + + public TimeAckManager(long tickDuration, TimeUnit unit, int ticksPerWheel) { + super( tickDuration, unit, ticksPerWheel); + } + + @Override + public void addAck(Ack ack) { + ackMap.put(ack.getId(),ack); + this.newTimeout(ack,ack.getTimed(),ack.getTimeUnit()); + } + + @Override + public Ack getAck(Long id) { + return ackMap.get(id); + } + + @Override + public void deleteAck(Long id) { + ackMap.remove(id); + } + +} diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java b/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java index edc8c28c694a6180ea9160a5a41e1fa9fe85b253..aa8f7410fe918bcd118af3086de92dd4869548d6 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java @@ -1,6 +1,9 @@ package io.github.quickmsg.common.channel; import com.fasterxml.jackson.annotation.JsonIgnore; +import io.github.quickmsg.common.ack.Ack; +import io.github.quickmsg.common.ack.RetryAck; +import io.github.quickmsg.common.ack.TimeAckManager; import io.github.quickmsg.common.enums.ChannelStatus; import io.github.quickmsg.common.topic.SubscribeTopic; import io.github.quickmsg.common.utils.MessageUtils; @@ -74,6 +77,8 @@ public class MqttChannel { private Disposable closeDisposable; + private TimeAckManager timeAckManager; + public void disposableClose() { if (closeDisposable != null && !closeDisposable.isDisposed()) { closeDisposable.dispose(); @@ -86,7 +91,7 @@ public class MqttChannel { } - public static MqttChannel init(Connection connection) { + public static MqttChannel init(Connection connection, TimeAckManager timeAckManager) { MqttChannel mqttChannel = new MqttChannel(); mqttChannel.setTopics(new CopyOnWriteArraySet<>()); mqttChannel.setAtomicInteger(new AtomicInteger(0)); @@ -97,6 +102,7 @@ public class MqttChannel { mqttChannel.setConnection(connection); mqttChannel.setStatus(ChannelStatus.INIT); mqttChannel.setAddress(connection.address().toString()); + mqttChannel.setTimeAckManager(timeAckManager); return mqttChannel; } @@ -179,6 +185,11 @@ public class MqttChannel { } + public long generateId(MqttMessageType type, Integer messageId) { + return (long) connection.channel().hashCode() << 5 | (long) type.value() << 4 | messageId; + } + + /** * 写入消息 * @@ -188,10 +199,10 @@ public class MqttChannel { */ public Mono write(MqttMessage mqttMessage, boolean retry) { // http本地mock - if (this.getIsMock()) { + if (this.getIsMock() && !this.active()) { return Mono.empty(); } else { - return MqttMessageSink.MQTT_SINK.sendMessage(mqttMessage, this, retry, replyMqttMessageMap); + return MqttMessageSink.MQTT_SINK.sendMessage(mqttMessage, this, retry); } } @@ -249,7 +260,7 @@ public class MqttChannel { public static MqttMessageSink MQTT_SINK = new MqttMessageSink(); - public Mono sendMessage(MqttMessage mqttMessage, MqttChannel mqttChannel, boolean retry, Map> replyMqttMessageMap) { + public Mono sendMessage(MqttMessage mqttMessage, MqttChannel mqttChannel, boolean retry) { if (log.isDebugEnabled()) { log.debug("write channel {} message {}", mqttChannel.getConnection(), mqttMessage); } @@ -258,7 +269,14 @@ public class MqttChannel { Increase the reference count of bytebuf, and the reference count of retrybytebuf is 2 mqttChannel.write() method releases a reference count. */ - return mqttChannel.write(Mono.just(mqttMessage)).then(offerReply(getReplyMqttMessage(mqttMessage), mqttChannel, getMessageId(mqttMessage), replyMqttMessageMap)); + MqttMessage reply = getReplyMqttMessage(mqttMessage); + + Runnable runnable = () -> mqttChannel.write(Mono.just(reply)).subscribe(); + Runnable cleaner = () -> MessageUtils.safeRelease(reply);; + Ack ack = new RetryAck(mqttChannel.generateId(reply.fixedHeader().messageType(), getMessageId(reply)), + 5, 5, runnable, mqttChannel.getTimeAckManager(), cleaner); + ack.start(); + return mqttChannel.write(Mono.just(mqttMessage)).then(); } else { return mqttChannel.write(Mono.just(mqttMessage)); } @@ -282,7 +300,6 @@ public class MqttChannel { } else { return mqttMessage; } - } @@ -311,21 +328,28 @@ public class MqttChannel { } - /** - * Set resend action - * - * @param message {@link MqttMessage} - * @param mqttChannel {@link MqttChannel} - * @param messageId messageId - * @param replyMqttMessageMap 重试缓存 - * @return 空操作符 - */ - public Mono offerReply(MqttMessage message, final MqttChannel mqttChannel, final int messageId, Map> replyMqttMessageMap) { - return Mono.fromRunnable(() -> replyMqttMessageMap.computeIfAbsent(message.fixedHeader().messageType(), mqttMessageType -> new ConcurrentHashMap<>(8)).put(messageId, mqttChannel.write(Mono.fromCallable(() -> getDupMessage(message))).delaySubscription(Duration.ofSeconds(5)).repeat(10, mqttChannel::isActive).doOnError(error -> { - MessageUtils.safeRelease(message); - log.error("offerReply", error); - }).doOnCancel(() -> MessageUtils.safeRelease(message)).subscribe())); - } +// /** +// * Set resend action +// * +// * @param message {@link MqttMessage} +// * @param mqttChannel {@link MqttChannel} +// * @param messageId messageId +// * @param replyMqttMessageMap 重试缓存 +// * @return 空操作符 +// */ +// public Mono offerReply(MqttMessage message, final MqttChannel mqttChannel, final int messageId, Map> replyMqttMessageMap) { +// return Mono.fromRunnable(() -> +// replyMqttMessageMap.computeIfAbsent(message.fixedHeader().messageType(), mqttMessageType -> new ConcurrentHashMap<>(8)).put(messageId, +// mqttChannel.write(Mono.fromCallable(() -> getDupMessage(message))) +// .delaySubscription(Duration.ofSeconds(5)) +// .repeat(10,mqttChannel::isActive) +// .doOnError(error -> { +// MessageUtils.safeRelease(message); +// log.error("offerReply", error); +// }) +// .doOnCancel(() -> MessageUtils.safeRelease(message)) +// .subscribe())); +// } } @@ -333,4 +357,5 @@ public class MqttChannel { public String toString() { return "MqttChannel{" + " address='" + this.connection.address().toString() + '\'' + ", clientIdentifier='" + clientIdentifier + '\'' + ", status=" + status + ", keepalive=" + keepalive + ", username='" + username + '}'; } + } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java index 806bb646d1c23b7fd0b17b495ddc5d3cea1ac107..7bb7ccfe187b9fc9f1363700c90fbfa0d43e8a79 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java @@ -2,6 +2,7 @@ package io.github.quickmsg.common.config; import ch.qos.logback.classic.Level; import com.fasterxml.jackson.annotation.JsonProperty; +import io.github.quickmsg.common.auth.PasswordAuthentication; import io.github.quickmsg.common.metric.MeterType; import io.github.quickmsg.common.rule.RuleChainDefinition; import io.github.quickmsg.common.rule.source.SourceDefinition; @@ -110,6 +111,9 @@ public class BootstrapConfig { @NoArgsConstructor @AllArgsConstructor public static class TcpConfig { + + @Builder.Default + private ConnectModel connectModel = ConnectModel.UNIQUE; /** * 端口 */ @@ -186,6 +190,11 @@ public class BootstrapConfig { */ Map childOptions; + /** + * PasswordAuthentication + */ + PasswordAuthentication authentication; + } @Data diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java index a7a277255c5ebe28719cdb3e4202cc427561dd5d..ca994d2b793e07445cb35b8d9d21f7d34250ec6e 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java @@ -5,6 +5,8 @@ package io.github.quickmsg.common.config; */ public interface Configuration { + ConnectModel getConnectModel(); + /** * netty boss线程数 diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/ConnectModel.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/ConnectModel.java new file mode 100644 index 0000000000000000000000000000000000000000..2a703fde6ccb48e3b8ccc108aecb3e155b94853b --- /dev/null +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/ConnectModel.java @@ -0,0 +1,9 @@ +package io.github.quickmsg.common.config; + +/** + * @author luxurong + */ +public enum ConnectModel { + UNIQUE, + KICK +} diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/context/ReceiveContext.java b/smqtt-common/src/main/java/io/github/quickmsg/common/context/ReceiveContext.java index e351c1974cb69952896e0257bef23fa63c2f70b1..21a062f5354786f22c137889d15dafbacdf197af 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/context/ReceiveContext.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/context/ReceiveContext.java @@ -1,5 +1,6 @@ package io.github.quickmsg.common.context; +import io.github.quickmsg.common.ack.TimeAckManager; import io.github.quickmsg.common.channel.ChannelRegistry; import io.github.quickmsg.common.channel.MqttChannel; import io.github.quickmsg.common.channel.traffic.TrafficHandlerLoader; @@ -99,5 +100,13 @@ public interface ReceiveContext extends BiConsumer extends HashMap { + + public JsonMap() { + super(); + } + + + public JsonMap(int initialCapacity) { + super(initialCapacity); + } + + @Override + public String toString() { + return JacksonUtil.map2Json(this); + } +} diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/JacksonUtil.java b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/JacksonUtil.java index cb8a5c34d3d0fd0557c56881f0ec547611f7a40f..3ba292d3c2d38cdd51d57aa9843c9a7a1256fd99 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/JacksonUtil.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/JacksonUtil.java @@ -27,6 +27,7 @@ public class JacksonUtil { mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true); + } public static String bean2Json(Object data) { diff --git a/smqtt-core/pom.xml b/smqtt-core/pom.xml index fc09429612fee2721b1239ad605ed55b4fe486d9..779e030fa08ac884a11f652137b704980bbaedb5 100644 --- a/smqtt-core/pom.xml +++ b/smqtt-core/pom.xml @@ -5,7 +5,7 @@ smqtt io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 smqtt-core @@ -14,22 +14,22 @@ io.github.quickmsg smqtt-common - 1.1.0 + 1.1.1 io.github.quickmsg smqtt-rule-dsl - 1.1.0 + 1.1.1 io.github.quickmsg smqtt-metric-influxdb - 1.1.0 + 1.1.1 io.github.quickmsg smqtt-metric-prometheus - 1.1.0 + 1.1.1 diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java index 00e2047a7306c4c923f73bb745e71967b9a15736..304178d8fe04715674288d46163d9c0fa60a7bbf 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java @@ -5,7 +5,6 @@ import io.github.quickmsg.common.config.BootstrapConfig; import io.github.quickmsg.common.config.SslContext; import io.github.quickmsg.common.rule.RuleChainDefinition; import io.github.quickmsg.common.rule.source.SourceDefinition; -import io.github.quickmsg.common.spi.DynamicLoader; import io.github.quickmsg.common.transport.Transport; import io.github.quickmsg.common.utils.BannerUtils; import io.github.quickmsg.common.utils.LoggerLevel; @@ -68,9 +67,16 @@ public class Bootstrap { private MqttConfiguration initMqttConfiguration() { MqttConfiguration mqttConfiguration = defaultConfiguration(); - if (tcpConfig.getUsername() != null || tcpConfig.getPassword() != null) { - mqttConfiguration.setReactivePasswordAuth((user, pwd, id) -> user.equals(tcpConfig.getUsername()) && new String(pwd).equals(tcpConfig.getPassword())); + if (tcpConfig.getAuthentication() != null) { + mqttConfiguration.setReactivePasswordAuth(tcpConfig.getAuthentication()); + } else { + if (tcpConfig.getUsername() != null || tcpConfig.getPassword() != null) { + mqttConfiguration.setReactivePasswordAuth((user, pwd, id) -> user.equals(tcpConfig.getUsername()) && new String(pwd).equals(tcpConfig.getPassword())); + } else { + mqttConfiguration.setReactivePasswordAuth((user, pwd, id) -> true); + } } + Optional.ofNullable(tcpConfig.getConnectModel()).ifPresent(mqttConfiguration::setConnectModel); Optional.ofNullable(tcpConfig.getPort()).ifPresent(mqttConfiguration::setPort); Optional.ofNullable(tcpConfig.getLowWaterMark()).ifPresent(mqttConfiguration::setLowWaterMark); Optional.ofNullable(tcpConfig.getHighWaterMark()).ifPresent(mqttConfiguration::setHighWaterMark); diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java b/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java index 9ded36e263127f3bf04b924dc04640e527f28bd8..12fc2b003a703d0bb4a7593d0ae6078fcd258284 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java @@ -2,6 +2,7 @@ package io.github.quickmsg.core.http; import io.github.quickmsg.common.config.BootstrapConfig; import io.github.quickmsg.common.config.Configuration; +import io.github.quickmsg.common.config.ConnectModel; import io.github.quickmsg.common.config.SslContext; import lombok.Data; @@ -37,6 +38,11 @@ public class HttpConfiguration implements Configuration { private BootstrapConfig.MeterConfig meterConfig; + @Override + public ConnectModel getConnectModel() { + return null; + } + @Override public Integer getBusinessThreadSize() { return 0; diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java index b95676590c6d62a83db505ace81a5eb3225731b0..f6f385179dbf3b890703f2ac852c79bd8f44982b 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java @@ -1,5 +1,6 @@ package io.github.quickmsg.core.mqtt; +import io.github.quickmsg.common.ack.TimeAckManager; import io.github.quickmsg.common.auth.PasswordAuthentication; import io.github.quickmsg.common.channel.ChannelRegistry; import io.github.quickmsg.common.channel.traffic.TrafficHandlerLoader; @@ -39,6 +40,7 @@ import reactor.core.scheduler.Schedulers; import reactor.netty.resources.LoopResources; import java.util.Optional; +import java.util.concurrent.TimeUnit; /** * @author luxurong @@ -74,6 +76,8 @@ public abstract class AbstractReceiveContext implements private final TrafficHandlerLoader trafficHandlerLoader; + private final TimeAckManager timeAckManager; + public AbstractReceiveContext(T configuration, Transport transport) { AbstractConfiguration abstractConfiguration = castConfiguration(configuration); @@ -94,6 +98,7 @@ public abstract class AbstractReceiveContext implements this.messageRegistry.startUp(abstractConfiguration.getEnvironmentMap()); this.metricManager = metricManager(abstractConfiguration.getMeterConfig()); Optional.ofNullable(abstractConfiguration.getSourceDefinitions()).ifPresent(sourceDefinitions -> sourceDefinitions.forEach(SourceManager::loadSource)); + this.timeAckManager = new TimeAckManager(20, TimeUnit.MILLISECONDS,512); } private TrafficHandlerLoader trafficHandlerLoader() { diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java index 1cd1552c90a11428c44a707f40122f8b3ad5905d..0ac9299cd9fe96d57d3f9445be00c83fc317b7f3 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java @@ -3,6 +3,7 @@ package io.github.quickmsg.core.mqtt; import io.github.quickmsg.common.auth.PasswordAuthentication; import io.github.quickmsg.common.config.AbstractConfiguration; import io.github.quickmsg.common.config.BootstrapConfig; +import io.github.quickmsg.common.config.ConnectModel; import io.github.quickmsg.common.config.SslContext; import io.github.quickmsg.common.rule.RuleChainDefinition; import io.github.quickmsg.common.rule.source.SourceDefinition; @@ -34,6 +35,8 @@ public class MqttConfiguration extends AbstractSslHandler implements AbstractCon private SslContext sslContext; + private ConnectModel connectModel; + private PasswordAuthentication reactivePasswordAuth = (u, p, c) -> true; private Integer bossThreadSize = Runtime.getRuntime().availableProcessors(); diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiver.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiver.java index 49d07140ce49c04b5a699fcadbeefbe3af537ac7..f302dae0c64dd2287003b8c0d7873eef65896bf7 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiver.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiver.java @@ -44,7 +44,7 @@ public class MqttReceiver extends AbstractSslHandler implements Receiver { .addHandler(MqttEncoder.INSTANCE) .addHandler(new MqttDecoder(mqttConfiguration.getMessageMaxSize())) .addHandler(receiveContext.getTrafficHandlerLoader().get()); - receiveContext.apply(MqttChannel.init(connection)); + receiveContext.apply(MqttChannel.init(connection,receiveContext.getTimeAckManager())); }); } } \ No newline at end of file diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/CommonProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/CommonProtocol.java index a092240f8852027a36089bad74682f838da71fb0..237135ab76e4af7a6dd907c6e54321cd12fdf079 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/CommonProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/CommonProtocol.java @@ -1,5 +1,6 @@ package io.github.quickmsg.core.protocol; +import io.github.quickmsg.common.ack.Ack; import io.github.quickmsg.common.channel.MqttChannel; import io.github.quickmsg.common.context.ReceiveContext; import io.github.quickmsg.common.enums.ChannelStatus; @@ -24,6 +25,7 @@ import reactor.util.context.ContextView; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -48,6 +50,7 @@ public class CommonProtocol implements Protocol { @Override public Mono parseProtocol(SmqttMessage smqttMessage, MqttChannel mqttChannel, ContextView contextView) { + ReceiveContext receiveContext = contextView.get(ReceiveContext.class); MqttMessage message = smqttMessage.getMessage(); switch (message.fixedHeader().messageType()) { case PINGREQ: @@ -64,8 +67,10 @@ public class CommonProtocol implements Protocol { case PUBREC: MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) message.variableHeader(); int messageId = messageIdVariableHeader.messageId(); - return mqttChannel.cancelRetry(MqttMessageType.PUBLISH, messageId) - .then(mqttChannel.write(MqttMessageBuilder.buildPublishRel(messageId), true)); + return Mono.fromRunnable(() -> { + Optional.ofNullable(receiveContext.getTimeAckManager().getAck(mqttChannel.generateId(MqttMessageType.PUBLISH, messageId))) + .ifPresent(Ack::stop); + }).then(mqttChannel.write(MqttMessageBuilder.buildPublishRel(messageId), true)); case PUBREL: MqttMessageIdVariableHeader relMessageIdVariableHeader = (MqttMessageIdVariableHeader) message.variableHeader(); int id = relMessageIdVariableHeader.messageId(); @@ -76,25 +81,30 @@ public class CommonProtocol implements Protocol { */ return mqttChannel.removeQos2Msg(id) .map(msg -> { - ReceiveContext receiveContext = contextView.get(ReceiveContext.class); TopicRegistry topicRegistry = receiveContext.getTopicRegistry(); MessageRegistry messageRegistry = receiveContext.getMessageRegistry(); Set subscribeTopics = topicRegistry.getSubscribesByTopic(msg.variableHeader().topicName(), msg.fixedHeader().qosLevel()); return Mono.when( - subscribeTopics.stream() - .filter(subscribeTopic -> filterOfflineSession(subscribeTopic.getMqttChannel(), messageRegistry, MessageUtils.wrapPublishMessage(msg, subscribeTopic.getQoS(), subscribeTopic.getMqttChannel().generateMessageId()))) - .map(subscribeTopic -> subscribeTopic.getMqttChannel() - .write(MessageUtils.wrapPublishMessage(msg, subscribeTopic.getQoS(), - subscribeTopic.getMqttChannel().generateMessageId()), subscribeTopic.getQoS().value() > 0) - ).collect(Collectors.toList())) - .then(mqttChannel.cancelRetry(MqttMessageType.PUBREC, id)) + subscribeTopics.stream() + .filter(subscribeTopic -> filterOfflineSession(subscribeTopic.getMqttChannel(), messageRegistry, MessageUtils.wrapPublishMessage(msg, subscribeTopic.getQoS(), subscribeTopic.getMqttChannel().generateMessageId()))) + .map(subscribeTopic -> subscribeTopic.getMqttChannel() + .write(MessageUtils.wrapPublishMessage(msg, subscribeTopic.getQoS(), + subscribeTopic.getMqttChannel().generateMessageId()), subscribeTopic.getQoS().value() > 0) + ).collect(Collectors.toList())) + .then(Mono.fromRunnable(() -> { + Optional.ofNullable(receiveContext.getTimeAckManager().getAck(mqttChannel.generateId(MqttMessageType.PUBREC, id))) + .ifPresent(Ack::stop); + })) .then(mqttChannel.write(MqttMessageBuilder.buildPublishComp(id), false)); }).orElseGet(() -> mqttChannel.write(MqttMessageBuilder.buildPublishComp(id), false)); case PUBCOMP: MqttMessageIdVariableHeader messageIdVariableHeader1 = (MqttMessageIdVariableHeader) message.variableHeader(); int compId = messageIdVariableHeader1.messageId(); - return mqttChannel.cancelRetry(MqttMessageType.PUBREL, compId); + return Mono.fromRunnable(() -> { + Optional.ofNullable(receiveContext.getTimeAckManager().getAck(mqttChannel.generateId(MqttMessageType.PUBREL, compId))) + .ifPresent(Ack::stop); + }); case PINGRESP: default: return Mono.empty(); diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java index cf17efd5d73740bcb081cd07114e2a379019257f..6d17c31a495baa1cf0555bf66f4c09ef2faec291 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java @@ -3,6 +3,7 @@ package io.github.quickmsg.core.protocol; import io.github.quickmsg.common.auth.PasswordAuthentication; import io.github.quickmsg.common.channel.ChannelRegistry; import io.github.quickmsg.common.channel.MqttChannel; +import io.github.quickmsg.common.config.ConnectModel; import io.github.quickmsg.common.context.ReceiveContext; import io.github.quickmsg.common.enums.ChannelStatus; import io.github.quickmsg.common.enums.Event; @@ -64,10 +65,15 @@ public class ConnectProtocol implements Protocol { MetricManager metricManager = mqttReceiveContext.getMetricManager(); PasswordAuthentication passwordAuthentication = mqttReceiveContext.getPasswordAuthentication(); /*check clientIdentifier exist*/ - if (channelRegistry.exists(clientIdentifier)) { - return mqttChannel.write( - MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED), - false).then(mqttChannel.close()); + if (mqttReceiveContext.getConfiguration().getConnectModel() == ConnectModel.UNIQUE) { + if (channelRegistry.exists(clientIdentifier)) { + return mqttChannel.write( + MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED), + false).then(mqttChannel.close()); + } + } else { + Optional.ofNullable( channelRegistry.get(clientIdentifier)) + .ifPresent(ch->ch.close().subscribe()); } /*protocol version support*/ if (MqttVersion.MQTT_3_1_1.protocolLevel() != (byte) mqttConnectVariableHeader.version() diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishAckProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishAckProtocol.java index cae7f290fa94099621d4a22a7feb4ff9dc8a3ae8..8d7e6b9d5245adc2caa6a7650150ea5677315186 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishAckProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishAckProtocol.java @@ -1,6 +1,8 @@ package io.github.quickmsg.core.protocol; +import io.github.quickmsg.common.ack.Ack; import io.github.quickmsg.common.channel.MqttChannel; +import io.github.quickmsg.common.context.ReceiveContext; import io.github.quickmsg.common.message.SmqttMessage; import io.github.quickmsg.common.protocol.Protocol; import io.netty.handler.codec.mqtt.MqttConnectMessage; @@ -12,6 +14,7 @@ import reactor.util.context.ContextView; import java.util.ArrayList; import java.util.List; +import java.util.Optional; /** * @author luxurong @@ -28,10 +31,14 @@ public class PublishAckProtocol implements Protocol { @Override public Mono parseProtocol(SmqttMessage smqttMessage, MqttChannel mqttChannel, ContextView contextView) { - MqttPubAckMessage message = smqttMessage.getMessage(); - MqttMessageIdVariableHeader idVariableHeader = message.variableHeader(); - int messageId = idVariableHeader.messageId(); - return mqttChannel.cancelRetry(MqttMessageType.PUBLISH,messageId); + return Mono.fromRunnable(()->{ + ReceiveContext receiveContext = contextView.get(ReceiveContext.class); + MqttPubAckMessage message = smqttMessage.getMessage(); + MqttMessageIdVariableHeader idVariableHeader = message.variableHeader(); + int messageId = idVariableHeader.messageId(); + Optional.ofNullable(receiveContext.getTimeAckManager().getAck(mqttChannel.generateId(MqttMessageType.PUBLISH,messageId))) + .ifPresent(Ack::stop); + }); } @Override diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java index 7f0235b63470b14d538dbf0c55e2495c961aa350..4007bacd823fe05b123456d3c8e4c2680278319e 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java @@ -45,11 +45,11 @@ public class DefaultProtocolAdaptor implements ProtocolAdaptor { @Override public void chooseProtocol(MqttChannel mqttChannel, SmqttMessage smqttMessage, ReceiveContext receiveContext) { MqttMessage mqttMessage = smqttMessage.getMessage(); - log.info(" 【{}】【{}】 【{}】", - Thread.currentThread().getName(), - mqttMessage.fixedHeader().messageType(), - mqttChannel); if (mqttMessage.decoderResult() != null && (mqttMessage.decoderResult().isSuccess())) { + log.info(" 【{}】【{}】 【{}】", + Thread.currentThread().getName(), + mqttMessage.fixedHeader().messageType(), + mqttChannel); Optional.ofNullable(types.get(mqttMessage.fixedHeader().messageType())) .ifPresent(protocol -> protocol .doParseProtocol(smqttMessage, mqttChannel) diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/websocket/WebSocketMqttReceiver.java b/smqtt-core/src/main/java/io/github/quickmsg/core/websocket/WebSocketMqttReceiver.java index 3f7da321b4018e09812c2d39a09e1d95769bb849..5f350a3cd3c4ae2e9ea1e4da7e5efccae14975ef 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/websocket/WebSocketMqttReceiver.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/websocket/WebSocketMqttReceiver.java @@ -53,7 +53,7 @@ public class WebSocketMqttReceiver extends AbstractSslHandler implements Receive .addHandlerLast(new ByteBufToWebSocketFrameEncoder()) .addHandlerLast(new MqttDecoder(mqttConfiguration.getMessageMaxSize())) .addHandlerLast(MqttEncoder.INSTANCE); - receiveContext.apply(MqttChannel.init(connection)); + receiveContext.apply(MqttChannel.init(connection, receiveContext.getTimeAckManager())); }); } diff --git a/smqtt-metric/pom.xml b/smqtt-metric/pom.xml index 4bb32c80feb4d7a19459dae4bc025e84ca70625e..cde0fe2a88d57e4ccdc6b2090a80d4a6bac2f719 100644 --- a/smqtt-metric/pom.xml +++ b/smqtt-metric/pom.xml @@ -8,7 +8,7 @@ smqtt io.github.quickmsg - 1.1.0 + 1.1.1 smqtt-metric diff --git a/smqtt-metric/smqtt-metric-influxdb/pom.xml b/smqtt-metric/smqtt-metric-influxdb/pom.xml index 74e7dc5a2ed6d1260cdd1a928d504f424be50796..c16cf20b200e9ae4c1aac2408d94d3866b606e27 100644 --- a/smqtt-metric/smqtt-metric-influxdb/pom.xml +++ b/smqtt-metric/smqtt-metric-influxdb/pom.xml @@ -5,7 +5,7 @@ smqtt-metric io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 @@ -15,7 +15,7 @@ io.github.quickmsg smqtt-common - 1.1.0 + 1.1.1 diff --git a/smqtt-metric/smqtt-metric-prometheus/pom.xml b/smqtt-metric/smqtt-metric-prometheus/pom.xml index 3b84a9a5487f1e2dbbd6c343baff341c2641ae96..86aad8e487837764ee6e50ddc52635205d882ffa 100644 --- a/smqtt-metric/smqtt-metric-prometheus/pom.xml +++ b/smqtt-metric/smqtt-metric-prometheus/pom.xml @@ -5,7 +5,7 @@ smqtt-metric io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 @@ -19,7 +19,7 @@ io.github.quickmsg smqtt-common - 1.1.0 + 1.1.1 diff --git a/smqtt-persistent/pom.xml b/smqtt-persistent/pom.xml index 1d30ce8da57827cff3dcd4d0d8159b2c32ea5e13..13c67c73872af57c9f41c396f42213e91cdfc87d 100644 --- a/smqtt-persistent/pom.xml +++ b/smqtt-persistent/pom.xml @@ -5,7 +5,7 @@ smqtt io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 pom diff --git a/smqtt-persistent/smqtt-persistent-db/pom.xml b/smqtt-persistent/smqtt-persistent-db/pom.xml index 370e12fdd9bd8d2f7079b287149f3f3687181855..025a1325fb3109166295f50d30bbab037d42548c 100644 --- a/smqtt-persistent/smqtt-persistent-db/pom.xml +++ b/smqtt-persistent/smqtt-persistent-db/pom.xml @@ -5,12 +5,12 @@ smqtt-persistent io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 smqtt-persistent-db - 1.1.0 + 1.1.1 3.14.11 @@ -20,7 +20,7 @@ io.github.quickmsg smqtt-common - 1.1.0 + 1.1.1 compile diff --git a/smqtt-persistent/smqtt-persistent-redis/pom.xml b/smqtt-persistent/smqtt-persistent-redis/pom.xml index 022d243f2cd9c9b6e45e98eaa3277da43c274f4f..9124bac67b0fd27e9a6b2869e250d58cf66c4b1e 100644 --- a/smqtt-persistent/smqtt-persistent-redis/pom.xml +++ b/smqtt-persistent/smqtt-persistent-redis/pom.xml @@ -5,12 +5,12 @@ smqtt-persistent io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 smqtt-persistent-redis - 1.1.0 + 1.1.1 3.15.6 @@ -20,7 +20,7 @@ io.github.quickmsg smqtt-common - 1.1.0 + 1.1.1 compile diff --git a/smqtt-registry/pom.xml b/smqtt-registry/pom.xml index fcec21ed17d9554c7c7e27e92808779badf19c55..a58f471e8a895791cf4f95d7443f625f87a9df40 100644 --- a/smqtt-registry/pom.xml +++ b/smqtt-registry/pom.xml @@ -5,7 +5,7 @@ smqtt io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 pom diff --git a/smqtt-registry/smqtt-registry-ignite/pom.xml b/smqtt-registry/smqtt-registry-ignite/pom.xml index 4742dca32345dcaa9a9e4b711bc56c9fce68b2f9..c35895ec9d87535c63665760223c699a365d5b1d 100644 --- a/smqtt-registry/smqtt-registry-ignite/pom.xml +++ b/smqtt-registry/smqtt-registry-ignite/pom.xml @@ -5,7 +5,7 @@ smqtt-registry io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 diff --git a/smqtt-registry/smqtt-registry-scube/pom.xml b/smqtt-registry/smqtt-registry-scube/pom.xml index ea2d4677cd950d0c840146d52024768e8c47469f..0dbe4f4e6dc9385b0532d6f74a3199fd170f7535 100644 --- a/smqtt-registry/smqtt-registry-scube/pom.xml +++ b/smqtt-registry/smqtt-registry-scube/pom.xml @@ -5,7 +5,7 @@ smqtt-registry io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 smqtt-registry-scube @@ -50,7 +50,7 @@ io.github.quickmsg smqtt-common - 1.1.0 + 1.1.1 provided diff --git a/smqtt-rule/pom.xml b/smqtt-rule/pom.xml index da4262acbadea0d0b55a8e0a8b4289590b0b667d..a1a6380c908d9394f3a4d686f89e5d02382df5cd 100644 --- a/smqtt-rule/pom.xml +++ b/smqtt-rule/pom.xml @@ -7,7 +7,7 @@ smqtt io.github.quickmsg - 1.1.0 + 1.1.1 smqtt-rule diff --git a/smqtt-rule/smqtt-rule-dsl/pom.xml b/smqtt-rule/smqtt-rule-dsl/pom.xml index 06d143ac23490f7372ebe17f57531a67798ca607..d663114c459dcf00129a337c936d09e1094674aa 100644 --- a/smqtt-rule/smqtt-rule-dsl/pom.xml +++ b/smqtt-rule/smqtt-rule-dsl/pom.xml @@ -5,7 +5,7 @@ smqtt-rule io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 @@ -15,13 +15,13 @@ io.github.quickmsg smqtt-common - 1.1.0 + 1.1.1 provided smqtt-rule-engine io.github.quickmsg - 1.1.0 + 1.1.1 diff --git a/smqtt-rule/smqtt-rule-engine/pom.xml b/smqtt-rule/smqtt-rule-engine/pom.xml index 82bb16e9b7eca32a8f63b7d9d261055ac3e55690..fadf42c21063247f11d989ea1b2a5155a6af1a2f 100644 --- a/smqtt-rule/smqtt-rule-engine/pom.xml +++ b/smqtt-rule/smqtt-rule-engine/pom.xml @@ -7,7 +7,7 @@ smqtt-rule io.github.quickmsg - 1.1.0 + 1.1.1 smqtt-rule-engine @@ -18,44 +18,44 @@ io.github.quickmsg smqtt-common - 1.1.0 + 1.1.1 provided io.github.quickmsg smqtt-rule-source-kafka - 1.1.0 + 1.1.1 io.github.quickmsg smqtt-rule-source-http - 1.1.0 + 1.1.1 io.github.quickmsg smqtt-rule-source-rocketmq - 1.1.0 + 1.1.1 io.github.quickmsg smqtt-rule-source-rabbitmq - 1.1.0 + 1.1.1 io.github.quickmsg smqtt-rule-source-db - 1.1.0 + 1.1.1 io.github.quickmsg smqtt-rule-source-mqtt - 1.1.0 + 1.1.1 diff --git a/smqtt-rule/smqtt-rule-source/pom.xml b/smqtt-rule/smqtt-rule-source/pom.xml index 6c46a35e39de41960187ad1953caf91830601892..1153ff46c50a759e29a7fa54a140f0a435238e66 100644 --- a/smqtt-rule/smqtt-rule-source/pom.xml +++ b/smqtt-rule/smqtt-rule-source/pom.xml @@ -8,7 +8,7 @@ smqtt-rule io.github.quickmsg - 1.1.0 + 1.1.1 smqtt-rule-source @@ -33,7 +33,7 @@ io.github.quickmsg smqtt-common - 1.1.0 + 1.1.1 provided diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-db/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-db/pom.xml index 84389977c026787362d55aa6c6f405aab1ef13c9..868a99331d4f83d77e7ee21d8db755d045dc83f2 100644 --- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-db/pom.xml +++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-db/pom.xml @@ -5,13 +5,13 @@ smqtt-rule-source io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 io.github.quickmsg smqtt-rule-source-db - 1.1.0 + 1.1.1 3.14.11 diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-http/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-http/pom.xml index 2dd03f0f6d3bea634c8ae1c2db361ed40d969fd8..8895ebb39527e34cfa3f8d906525ebc982e7739f 100644 --- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-http/pom.xml +++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-http/pom.xml @@ -6,7 +6,7 @@ io.github.quickmsg smqtt-rule-source-http - 1.1.0 + 1.1.1 smqtt-rule-source-http @@ -15,14 +15,14 @@ smqtt-rule-source io.github.quickmsg - 1.1.0 + 1.1.1 io.github.quickmsg smqtt-common - 1.1.0 + 1.1.1 provided diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-kafka/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-kafka/pom.xml index 243882f9fbfedd430788235bc2b7ecdd6ac1ff7a..78e6ccb4887af0febf0b1e4f7c1dd2f361a8a0cd 100644 --- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-kafka/pom.xml +++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-kafka/pom.xml @@ -5,12 +5,12 @@ smqtt-rule-source io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 smqtt-rule-source-kafka - 1.1.0 + 1.1.1 https://github.com/quickmsg/smqtt diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/pom.xml index 197f89d70bb5017db8989bb2ead790725ee7c85f..b35e4461cd83967157cac7cbea4ad56a9a65f5ba 100644 --- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/pom.xml +++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/pom.xml @@ -5,13 +5,13 @@ smqtt-rule-source io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 io.github.quickmsg smqtt-rule-source-mqtt - 1.1.0 + 1.1.1 diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/src/main/java/io/github/quickmsg/source/mqtt/MqttSourceBean.java b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/src/main/java/io/github/quickmsg/source/mqtt/MqttSourceBean.java index 6416db1bbb1a791edcf148b11daad6c693178952..342869400d9223a4aeddc376afe2d419f58716fa 100644 --- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/src/main/java/io/github/quickmsg/source/mqtt/MqttSourceBean.java +++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/src/main/java/io/github/quickmsg/source/mqtt/MqttSourceBean.java @@ -7,6 +7,7 @@ import com.hivemq.client.mqtt.mqtt3.message.connect.Mqtt3ConnectBuilder; import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck; import io.github.quickmsg.common.rule.source.Source; import io.github.quickmsg.common.rule.source.SourceBean; +import io.github.quickmsg.common.utils.JacksonUtil; import io.netty.util.internal.StringUtil; import lombok.extern.slf4j.Slf4j; @@ -91,12 +92,13 @@ public class MqttSourceBean implements SourceBean { @Override public void transmit(Map object) { String topic = (String) object.get("topic"); - String msg = (String) object.get("msg"); + Object msg =object.get("msg"); + String bytes = msg instanceof Map ? JacksonUtil.map2Json((Map) msg): msg.toString(); Boolean retain = (Boolean) object.get("retain"); Integer qos = Optional.ofNullable((Integer)object.get("qos")).orElse(0); client.publishWith() .topic(topic) - .payload(msg.getBytes()) + .payload(bytes.getBytes()) .qos(Objects.requireNonNull(MqttQos.fromCode(qos))) .retain(retain) .send() diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rabbitmq/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rabbitmq/pom.xml index a63c83a67984e0981a814f5985f444f09d15d3d6..92921f22ec40144d894ce5b16ba8f297ddfcf398 100644 --- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rabbitmq/pom.xml +++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rabbitmq/pom.xml @@ -5,13 +5,13 @@ smqtt-rule-source io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 io.github.quickmsg smqtt-rule-source-rabbitmq - 1.1.0 + 1.1.1 diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rocketmq/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rocketmq/pom.xml index f03aa491bd250288f03ff0d590fcd72c61dddc7b..17161390653e3cb185bd398d8956d6c0904bd38d 100644 --- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rocketmq/pom.xml +++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rocketmq/pom.xml @@ -5,13 +5,13 @@ smqtt-rule-source io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 io.github.quickmsg smqtt-rule-source-rocketmq - 1.1.0 + 1.1.1 diff --git a/smqtt-spring-boot-starter/pom.xml b/smqtt-spring-boot-starter/pom.xml index 56f05fac21621bccb5fe8386946de1a6c2908a3b..9f2a69b285bb24512354b6afd46addd16321db2a 100644 --- a/smqtt-spring-boot-starter/pom.xml +++ b/smqtt-spring-boot-starter/pom.xml @@ -7,7 +7,7 @@ smqtt io.github.quickmsg - 1.1.0 + 1.1.1 smqtt-spring-boot-starter @@ -34,17 +34,17 @@ io.github.quickmsg smqtt-core - 1.1.0 + 1.1.1 smqtt-registry-scube io.github.quickmsg - 1.1.0 + 1.1.1 smqtt-ui io.github.quickmsg - 1.1.0 + 1.1.1 io.projectreactor.netty diff --git a/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java b/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java index 2186cce609873db5036f6a44e6f13f9598ef4dcc..60132949c90165647e253c05f617e0dabb137996 100644 --- a/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java +++ b/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java @@ -1,6 +1,8 @@ package io.github.quickmsg.starter; import ch.qos.logback.classic.Level; +import io.github.quickmsg.common.auth.PasswordAuthentication; +import io.github.quickmsg.common.config.ConnectModel; import io.github.quickmsg.common.utils.IPUtils; import io.github.quickmsg.core.Bootstrap; import lombok.extern.slf4j.Slf4j; @@ -15,7 +17,7 @@ import org.springframework.context.annotation.Configuration; @Slf4j @Configuration @EnableConfigurationProperties(SpringBootstrapConfig.class) -public class AutoMqttConfiguration { +public class AutoMqttConfiguration { /** @@ -25,7 +27,8 @@ public class AutoMqttConfiguration { * @return {@link Bootstrap} */ @Bean - public Bootstrap startServer(@Autowired SpringBootstrapConfig springBootstrapConfig) { + public Bootstrap startServer(@Autowired SpringBootstrapConfig springBootstrapConfig, @Autowired(required = false) PasswordAuthentication authentication) { + check(springBootstrapConfig,authentication); return Bootstrap.builder() .rootLevel(Level.toLevel(springBootstrapConfig.getLogLevel())) .tcpConfig(springBootstrapConfig.getTcp()) @@ -42,6 +45,15 @@ public class AutoMqttConfiguration { .doOnSuccess(this::printUiUrl).block(); } + private void check(SpringBootstrapConfig springBootstrapConfig, PasswordAuthentication authentication) { + if(springBootstrapConfig.getTcp().getConnectModel() == null){ + springBootstrapConfig.getTcp().setConnectModel(ConnectModel.UNIQUE); + } + if(authentication !=null){ + springBootstrapConfig.getTcp().setAuthentication(authentication); + } + } + public void printUiUrl(Bootstrap bootstrap) { String start = "\n-------------------------------------------------------------\n\t"; start += String.format("Smqtt mqtt connect url %s:%s \n\t", IPUtils.getIP(), bootstrap.getTcpConfig().getPort()); diff --git a/smqtt-ui/pom.xml b/smqtt-ui/pom.xml index 545136425dae8e4e9723a0836e02640439865129..1beb638142261bdc7a0b77a36c1862cef1ea8b2e 100644 --- a/smqtt-ui/pom.xml +++ b/smqtt-ui/pom.xml @@ -5,7 +5,7 @@ smqtt io.github.quickmsg - 1.1.0 + 1.1.1 4.0.0 smqtt-ui