常见概念
编程基础模型
Apache RocketMQ作为阿里开源的一款高性能、高吞吐的分布式消息中间件
特点
概念
安装前提条件(推荐)
安装
安装64位 JDK1.8
安装maven 3.2+
安装教程:Linux软件安装教程
安装RocketMQ4.X可能出现的问题
NameServer内存不够怎么处理
报错问题如下
[root@iZwz94sw188z3yfl7lpmmsZ apache-rocketmq]# sh bin/mqnamesrv
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000006ec800000, 2147483648, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 2147483648 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /usr/local/software/rocketmq-all-4.4.0/distribution/target/apache-rocketmq/hs_err_pid8993.log
解决如下 编辑 bin/runserver.sh:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -server -Xms528m -Xmx528m -Xmn256m"
安装RocketMQ4.x控制台
简介:阿里云服务器安装RocketMQ控制台
上传源码包-》解压-》进入rocketmq-console目录-》 编译打包 mvn clean package -Dmaven.test.skip=true
务必修改下面两个,再进行编译打包
进入target目录 ,启动 java -jar rocketmq-console-ng-1.0.0.jar
守护进程方式启动 nohup java -jar rocketmq-console-ng-1.0.0.jar &
必须先启动 nameserver和broker
引入依赖
<!-- 注意,依赖包的版本必须和安装的RocketMQ的版本一致 否则不能自动创建topic -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
常见的错误1
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException:
sendDefaultImpl call timeout
原因:阿里云存在多网卡,rocketmq都会根据当前网卡选择一个IP使用,当你的机器有多块网卡时,很有可能会有问题。比如,我遇到的问题是我机器上有两个IP,一个公网IP,一个私网IP, 因此需要配置broker.conf 指定当前的公网ip, 然后重新启动broker
新增配置:conf/broker.conf (属性名称brokerIP1=broker所在的公网ip地址 )
新增这个配置:brokerIP1=120.76.62.13
启动命令:nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &
常见错误2
MQClientException: No route info of this topic, TopicTest1
原因:Broker 禁止自动创建 Topic,且用户没有通过手工方式创建 此Topic, 或者broker和Nameserver网络不通
解决:
通过 sh bin/mqbroker -m 查看配置
autoCreateTopicEnable=true 则自动创建topic
Centos7关闭防火墙 systemctl stop firewalld
常见错误3
控制台查看不了数据,提示连接 10909错误
原因:Rocket默认开启了VIP通道,VIP通道端口为10911-2=10909
解决:阿里云安全组需要增加一个端口 10909
其他错误
https://blog.csdn.net/qq_14853889/article/details/81053145
https://blog.csdn.net/wangmx1993328/article/details/81588217#%E5%BC%82%E5%B8%B8%E8%AF%B4%E6%98%8E
https://www.jianshu.com/p/bfd6d849f156
https://blog.csdn.net/wangmx1993328/article/details/81588217
代码实现
// JmsConfig.java
public class JmsConfig {
public static final String NAME_SERVER_ADDR = "47.112.124.138:9876";
public static final String TOPIC = "icanci_pay_test_topic";
}
@Component
public class PayProducer {
private String producerGroup = "pay_producer_group";
private DefaultMQProducer producer;
public PayProducer() {
producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(JmsConfig.NAME_SERVER_ADDR);
start();
}
public DefaultMQProducer getProducer() {
return producer;
}
/**
* 对象在使用之前必须要调用一下,只能初始化一次
*/
public void start() {
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
/**
* 停止
* 一般在应用上下文,使用上下文监听器,进行关闭
*/
public void shutdown() {
}
}
@RestController
public class PayController {
@Autowired
private PayProducer payProducer;
@RequestMapping("/api/v1/pay_cb")
public Object callBack(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message message = new Message(JmsConfig.TOPIC, "taha", ("hello xtt " + text).getBytes());
SendResult send = payProducer.getProducer().send(message);
System.out.println(send);
return new HashMap<>();
}
}
@Component
public class PayConsumer {
private DefaultMQPushConsumer consumer;
private String consumerGroup = "pay_consumer_group";
public PayConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(JmsConfig.NAME_SERVER_ADDR);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe(JmsConfig.TOPIC, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
Message msg = msgs.get(0);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
String topic = msg.getTopic();
String body = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
String keys = msg.getKeys();
System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
System.out.println("consumer start");
}
}
1、Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.42.1:10911> failed
2、com.alibaba.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [1647]ms, Topic: TopicTest1, BrokersSent: [broker-a, null, null]
3、org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [497]ms, Topic: TopicTest, BrokersSent: [Book-Air.local, MacBook-Air.local, MacBook-Air.local]
解决:多网卡问题处理
1、设置producer: producer.setVipChannelEnabled(false);
2、编辑ROCKETMQ 配置文件:broker.conf(下列ip为自己的ip)
namesrvAddr = 192.168.0.101:9876
brokerIP1 = 192.168.0.101
4、DESC: service not available now, maybe disk full, CL:
解决:修改启动脚本runbroker.sh,在里面增加一句话即可:
JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98"
(磁盘保护的百分比设置成98%,只有磁盘空间使用率达到98%时才拒绝接收producer消息)
常见问题处理
https://blog.csdn.net/sqzhao/article/details/54834761
https://blog.csdn.net/mayifan0/article/details/67633729
https://blog.csdn.net/a906423355/article/details/78192828
单节点
主从(异步、同步双写)
双主
双主双从,多主多从模式(异步复制)
双主双从,多主多从模式(同步双写)
推荐方案:主从(异步、同步双写)、双主双从,多主多从模式(异步复制)、双主双从,多主多从模式(同步双写)
简介:主从模式如何保证消息可靠性
TODO 电脑虚拟机总是跑不起来
环境: RocketMQ4.X + JDK8 + Maven +CentOS7
机器列表:
server1 ssh root@192.168.159.129
server2 ssh root@192.168.159.130
server3 ssh root@192.168.159.131
server4 ssh root@192.168.159.132
vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms528m -Xmx528m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
vim runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms528m -Xmx528m -Xmn256m"
启动两个机器的 nameserver
nohup sh bin/mqnamesrv &
全路径
/usr/local/software/rocketmq/distribution/target/apache-rocketmq
主节点
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties &
namesrvAddr=192.168.159.129:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
从节点
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties &
namesrvAddr=192.168.159.129:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
修改事项
pom.xml 里面的rocketmq版本号
application.properties里面的nameserver
增加 rocketmq.config.namesrvAddr=192.168.159.129:9876;192.168.159.130:9876
mvn install -Dmaven.test.skip=true
java -jar rocketmq-console-ng-1.0.0.jar
centos7关闭防火墙
systemctl stop firewalld
远程拷贝到本地
scp xdclass@192.168.0.106:/Users/xdclass/Desktop/xdclass/消息队列/data/第3章/第7集/apache-maven-3.6.0-bin.tar.gz /usr/local/software
scp root@192.168.0.106:/Users/xdclass/Desktop/xdclass/消息队列/data/第3章/第7集/apache-maven-3.6.0-bin.tar.gz /usr/local/software
Broker分为master与slave,一个master可以对应多个Slave,但一个slave只能对应一个master,master与slave通过相同的Broker Name来匹配,不同的broker Id来定义是master还是slave
只有master才能进行写入操作,slave不允许写入只能同步,同步策略取决于master的配置。
客户端消费可以从master和slave消费,默认消费者都从master消费,如果在master挂后,客户端从NameServer中感知到Broker宕机,就会从slave消费, 感知非实时,存在一定的滞后性,slave不能保证master的消息100%都同步过来了,会有少量的消息丢失。但一旦master恢复,未同步过去的消息会被最终消费掉
如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用,所以需要控制让queue的总数量大于等于consumer的数量
生产者Producer重试(异步和SendOneWay下配置无效)
消费端重试
原因:消息处理异常、broker端到consumer端各种问题,如网络原因闪断,消费处理失败,ACK返回失败等等问题。
注意:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
超过重试次数人工补偿
消费端去重
一条消息无论重试多少次,这些重试消息的 Message ID,key 不会改变。
消费重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息
producer.send(message, new SendCallback(){
onSuccess(){}
onException(){}
})
汇总对比
发送方式 | 发送 TPS | 发送结果反馈 | 可靠性 |
---|---|---|---|
同步发送 | 快 | 有 | 不丢失 |
异步发送 | 快 | 有 | 不丢失 |
单向发送 | 最快 | 无 | 可能丢失 |
什么是延迟消息:
Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息,目前支持固定精度的消息
代码:rocketmq-store > MessageStoreConfig.java 属性 messageDelayLevel
"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
使用message.setDelayTimeLevel(xxx) //xxx是级别,1表示配置里面的第一个级别,2表示第二个级别
定时消息:目前rocketmq开源版本还不支持,商业版本则有,两者使用场景类似
使用场景
//可以使用Jdk8的lambda表达式,只有一个方法需要被实现
producer.send(message, new MessageQueueSelector(){
select(List<MessageQueue> mqs, Message msg, Object arg){
Integer queueNum = (Integer)arg;
return mqs.get(queueNum);
}
},0)
顺序消息和对应可以使用的场景,订单系统,
什么是顺序消息:消息的生产和消费顺序一致
全局顺序:topic下面全部消息都要有序(少用)
局部顺序:只要保证一组消息被顺序消费即可(RocketMQ使用)
性能要求高
电商的订单创建,同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息、订单交易成功消息 都会按照先后顺序来发布和消费
(阿里巴巴集团内部电商系统均使用局部顺序消息,既保证业务的顺序,同时又能保证业务的高性能)
顺序发布:对于指定的一个 Topic,客户端将按照一定的先后顺序发送消息
顺序消费:对于指定的一个 Topic,按照一定的先后顺序接收消息,即先发送的消息一定会先被客户端接收到。
注意:
生产端保证发送消息有序,且发送到同一个Topic的同个queue里面,RocketMQ的确是能保证FIFO的
例子:订单的顺序流程是:创建、付款、物流、完成,订单号相同的消息会被先后发送到同一个队列中,
根据MessageQueueSelector里面自定义策略,根据同个业务id放置到同个queue里面,如订单号取模运算再放到selector中,同一个模的值都会投递到同一条queue
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//如果是订单号是字符串,则进行hash,得到一个hash值
Long id = (Long) arg;
long index = id % mqs.size();
return mqs.get((int)index);
}
消费端要在保证消费同个topic里的同个队列,不应该用MessageListenerConcurrently,
应该使用MessageListenerOrderly,自带单线程消费消息,不能再Consumer端再使用多线程去消费,消费端分配到的queue数量是固定的,集群消会锁住当前正在消费的队列集合的消息,所以会保证顺序消费。
简介:讲解使用代码编写案例,进行RocketMQ顺序消息消费实战
consumeFromWhere配置 [ 某些情况失效:参考https://blog.csdn.net/a417930422/article/details/83585397 ]
CONSUME_FROM_FIRST_OFFSET: 初次从消息队列头部开始消费,即历史消息(还储存在broker的)全部消费一遍,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_LAST_OFFSET: 默认策略,初次从该队列最尾开始消费,即跳过历史消息,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_TIMESTAMP : 从某个时间点开始消费,默认是半个小时以前,后续再启动接着上次消费的进度开始消费
allocateMessageQueueStrategy:
offsetStore:消息消费进度存储器 offsetStore 有两个策略:
consumeThreadMin 最小消费线程池数量
consumeThreadMax 最大消费线程池数量
pullBatchSize: 消费者去broker拉取消息时,一次拉取多少条。可选配置
consumeMessageBatchMaxSize: 单次消费时一次性消费多少条消息,批量消费接口才有用,可选配置
messageModel : 消费者消费模式, CLUSTERING——默认是集群模式CLUSTERING BROADCASTING——广播模式
一个Message只有一个Tag,tag是二级分类
过滤分为Broker端和Consumer端过滤
一般是监听 * ,或者指定 tag,|| 运算 , SLQ92 , FilterServer等;
注意:消费者订阅关系要一致,不然会消费混乱,甚至消息丢失
在Broker 端进行MessageTag过滤,遍历message queue存储的 message tag和 订阅传递的tag 的hashcode不一样则跳过,符合的则传输给Consumer,在consumer queue存储的是对应的hashcode, 对比也是通过hashcode对比; Consumer收到过滤消息后也会进行匹配操作,但是是对比真实的message tag而不是hashcode
如果想使用多个Tag,可以使用sql表达式,但是不建议,单一职责,多个队列
常见错误
The broker does not support consumer to filter message by SQL92
解决:broker.conf 里面配置如下
enablePropertyFilter=true
备注,修改之后要重启Broker
master节点配置:vim conf/2m-2s-async/broker-a.properties
slave节点配置:vim conf/2m-2s-async/broker-a-s.properties
什么是offset
message queue是无限长的数组,一条消息进来下标就会涨1,下标就是offset,消息在某个MessageQueue里的位置,通过offset的值可以定位到这条消息,或者指示Consumer从这条消息开始向后处理
message queue中的maxOffset表示消息的最大offset, maxOffset并不是最新的那条消息的offset,而是最新消息的offset+1,minOffset则是现存在的最小offset。
fileReserveTime=48 默认消息存储48小时后,消费会被物理地从磁盘删除,message queue的min offset也就对应增长。所以比minOffset还要小的那些消息已经不在broker上了,就无法被消费
类型(父类是OffsetStore):
有什么用
建议采用pushConsumer,RocketMQ自动维护OffsetStore,如果用另外一种pullConsumer需要自己进行维护OffsetStore
消息存储是由ConsumeQueue和CommitLog配合完成
ConsumeQueue: 是逻辑队列, CommitLog是真正存储消息文件的,存储的是指向物理存储的地址
Topic下的每个message queue都有对应的ConsumeQueue文件,内容也会被持久化到磁盘
默认地址:store/consumequeue/{topicName}/{queueid}/fileName
什么是CommitLog:
Broker里面一个Topic
高效原因
CommitLog顺序写, 存储了MessagBody、message key、tag等信息
ConsumeQueue随机读 + 操作系统的PageCache + 零拷贝技术ZeroCopy
零拷贝技术
read(file, tmp_buf, len);
write(socket, tmp_buf, len);
例子:将一个File读取并发送出去(Linux有两个上下文,内核态,用户态)
ZeroCopy:
RocketMQ事务消息:
半消息Half Message:
消息回查:
整体交互流程
RocketMQ事务消息的状态
关于事务消息的消费
4台机器, 2台部署NameServer, 4台都部署Broker, 双主双从 同步复制,异步刷盘
jdk、maven、rocketmq上传和安装
机器列表
server1 ssh root@192.168.159.133 部署nameServer Broker-a
server2 ssh root@192.168.159.130 部署nameServer Broker-a-s
server3 ssh root@192.168.159.131 Broker-b
server4 ssh root@192.168.159.132 Broker-b-s
修改RocketMQ(启动内存配置, 4个机器都要修改, 其中runbroker.sh修改4个,runserver.sh修改2个)
vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms528m -Xmx528m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
vim runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms528m -Xmx528m -Xmn256m"
启动两个机器的 nameserver
nohup sh bin/mqnamesrv &
全路径
/usr/local/software/rocketmq/distribution/target/apache-rocketmq
编辑并启动rocketmq命令
broker-a主节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-a.properties &
namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir=
#storePathCommitLog
broker-a从节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-a-s.properties &
namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir=
#storePathCommitLog
broker-b主节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b.properties &
namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir=
#storePathCommitLog
broker-b从节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b-s.properties &
namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir=
#storePathCommitLog
参考命令
CentOS 6.5关闭防火墙
servcie iptables stop
centos7关闭防火墙
systemctl stop firewalld
systemctl stop firewalld.service
注意:如果连接不了broker,日志提示连接的端口少2位,记得检查防火墙是否关闭
修正上节的错误 brokername名称
修改事项
pom.xml 里面的rocketmq版本号
路径 /usr/local/software/rocketmq-externals-master/rocketmq-console/src/main/resources
application.properties里面的nameserver
增加 rocketmq.config.namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
mvn install -Dmaven.test.skip=true
java -jar rocketmq-console-ng-1.0.0.jar
CentOS 6.5关闭防火墙
servcie iptables stop
centos7关闭防火墙
systemctl stop firewalld
systemctl stop firewalld.service
Topic创建线上禁止开启自动创建
一般是有专门的后台进行队列的CRUD,应用上线需要申请队列名称
生产环境推荐配置
性能分析思路
消息队列选择问题:Apache ActiveMQ、Kafka、RabbitMQ、RocketMQ
RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重
接口幂等性保障 ,消费端处理业务消息要保持幂等性
Redis
setNX() , 做消息id去重 java版本目前不支持设置过期时间
//Redis中操作,判断是否已经操作过 TODO
boolean flag = jedis.setNX(key);
if(flag){
//消费
}else{
//忽略,重复消费
}
拓展(如果再用expire则不是原子操作,可以用下面方式实现分布式锁)
加锁
String result = jedis.set(key, value, "NX", "PX", expireTime)
解锁(Lua脚本,先检查key,匹配再释放锁,lua可以保证原子性)
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
备注:lockKey可以是商品id,requestId用于标示是同个客户端
Incr 原子操作:key自增,大于0 返回值大于0则说明消费过
int num = jedis.incr(key);
if(num == 1){
//消费
}else{
//忽略,重复消费
}
上述两个方式都可以,但是不能用于分布式锁,考虑原子问题,但是排重可以不考虑原子问题,数据量多需要设置过期时间
数据库去重表
MQ架构配置
发送端高可用
双主双从架构:创建Topic对应的时候,MessageQueue创建在多个Broker上
即相同的Broker名称,不同的brokerid(即主从模式);当一个Master不可用时,组内其他的Master仍然可用。
但是机器资源不足的时候,需要手工把slave转成master,目前不支持自动转换,可用shell处理
消费高可用
提高消息的消费能力
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。