★消息队列16篇
RocketMQ是一款基于Java开发的分布式消息中间件,它以其高性能、高可靠性、高实时性以及分布式特性而广受好评。
它支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。互联网场景中经常使用RocketMQ进行消息路由、订阅发布、异步解耦、流量削减峰等操作,来缓解系统的压力。
特性 | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|
开发语言 | erlang | java | scala |
单机吞吐量 | 1w+ | 10w+ | 10w+ |
时效性 | us级 | ms级 | ms级以内 |
可用性 | 高(主从架构) | 非常高(分布式架构) | 非常高(分布式架构) |
消息可靠性 | 基本不丢 | 参数化配置和持久化:基本不丢 | 参数化配置和持久化:基本不丢 |
功能特性 | 基于erlang开发,所以并发能力很强,性能极其好,延时很低;管理界面较丰富 | MQ功能比较完备,扩展性佳 | 只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广。 |
生态 | 开源、稳定、社区活跃度高 | 阿里开源,交给Apache,社区活跃度低 | Apache开发,开源、高吞吐量、社区活跃度高 |
技术选型决策参考:
(1)中小型软件,建议选RabbitMQ, 中小型软件数据量没那么大,选消息中间件,应首选功能比较完备的,所以kafka排除。
不考虑rocketmq的原因是,rocketmq是阿里出品,如果阿里放弃维护rocketmq,中小型公司一般抽不出人来进行rocketmq的定制化开发,因此不推荐。
(2)大型软件公司,根据具体使用在rocketMq和kafka之间二选一。一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。
针对rocketMQ,大型软件公司也可以抽出人手对rocketMQ进行定制化开发,毕竟有能力改JAVA源码的人,还是相当多的。
至于kafka,根据业务场景选择,如果有日志采集功能,肯定是首选kafka了。具体该选哪个,看使用场景。引入MQ之后,必然导致系统可用性降低,复杂性增大。
1. 解耦:
比如说系统A会交给系统B去处理一些事情,但是A不想直接跟B有关联,避免耦合太强,就可以通过在A,B中间加入消息队列,A将要任务的事情交给消息队列 ,B订阅消息队列来执行任务。
这种场景很常见,比如A是订单系统,B是库存系统,可以通过消息队列把削减库存的工作交予B系统去处理。如果A系统同时想让B、C、D…多个系统处理问题的时候,这种优势就更加明显了。
2. 有序性:
先进先出原理,先来先处理,比如一个系统处理某件事需要很长一段时间,但是在处理这件事情时候,有其他人也发出了请求,可以把请求放在消息队里,一个一个来处理。
对数据的顺序性和一致性有强需求的业务,比如同一张银行卡同时被多个入口使用,需要保证入账出账的顺序性,避免出现数据不一致。
3. 消息路由/数据分发:
按照不同的规则,将队列中消息发送到不同的其他队列中
通过消息队列将不同染色的请求发送到不同的服务去操作。这样达成了流量按照业务拆分的目的。
4、
异步处理:
处理一项任务的时候,有3个步骤A、B、C,需要先完成A操作, 然后做B、C 操作。任务执行成功与否强依赖A的结果,但不依赖B、C 的结果。
如果我们使用串行的执行方式,那处理任务的周期就会变长,系统的整体吞吐能力也会降低(在同一个系统中做异步其实也是比较大的开销),所以使用消息队列是比较好的办法。
登录操作就是典型的场景:A:执行登录并得到结果、B:记录登录日志、C:将用户信息和Token写入缓存。 执行完A就可以从登录页跳到首页了,B、C让服务慢慢去消化,不阻塞当前操作。
5、
削峰:
将峰值期间的操作削减,比如A同学的整个操作流程包含12个步骤,后续的11个步骤是不需要强关注结果的数据,可以放在消息队列中。
RocketMQ主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分。这些角色通常以集群的方式存在,RocketMQ 基于纯Java开发,
具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。
1. Producer:
负责生产消息,一般由业务系统负责。生产者通过调用API将消息发送到指定的Topic(主题)中。
2. Broker:
消息存储中心,负责接收来自Producer的消息并存储,同时Consumer也从这里取得消息。Broker还存储与消息相关的元数据,包括消费者组、消费进度偏移量、队列信息等。每个Broker可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的Broker。在实际部署中,Broker对应一台服务器,并分为Master与Slave两种类型,Master负责读写,Slave只负责读,以此实现数据的备份和负载均衡。
3. Consumer:
负责消费消息,一般由后台系统负责。消费者通过订阅Topic来获取消息,并根据业务逻辑进行处理。消费者可以以集群消费或广播消费的方式消费消息。
4. NameServer:
充当路由消息的提供者,负责保存Broker的元数据信息,并供Producer和Consumer查询。NameServer被设计成几乎无状态的,可以横向扩展,节点之间无通信。
在RocketMQ中,Group功能是其核心特性之一。Group主要分为发送端Group和消费端Group。
用来区分消息的种类,表示一类消息的逻辑名字,消息的逻辑管理单位,无论生产还是消费消息,都需要执行Topic。比如一个Topic专门用于用户订单消息发送,一个Topic专门用于扣减或增加积分的。
MessageQueue是RocketMQ中用于存储和传输消息的数据结构。每个MessageQueue都有一个唯一标识符,由Topic名称和队列编号组成。MessageQueue具有以下特点:
Tag是RocketMQ中用于对消息进行分类和过滤的标记。生产者可以在发送消息时指定Tag,消费者可以根据Tag来过滤和订阅消息。Tag的使用可以使得消息的管理和消费更加灵活和高效。例如,一个电商系统可能会根据商品的类别(如服装、电子产品等)来设置不同的Tag,消费者可以根据这些Tag来订阅和处理特定类别的消息。
Offset在RocketMQ中用于标识消费者在消息队列中的位置。每个消费者都会维护一个Offset,以便知道下一次从哪里开始消费。Offset的使用可以确保消息不丢失、避免消息重复消费以及支持消息的顺序消费。
引入依赖
在Java项目中,通常通过Maven或Gradle等构建工具来引入RocketMQ的客户端依赖。以Maven为例,可以在
pom.xml
文件中添加以下依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>最新版本号(5.3.1)</version>
</dependency>
实现生产者
在Java中,通过创建
DefaultMQProducer
对象来实现消息的生产者。生产者需要设置NameServer地址,并调用
start()
方法初始化。然后,可以创建
Message
对象并设置主题、标签和消息内容,最后调用
send()
方法发送消息。
public class Producer {
public static void main(String[] args) throws Exception {
// 创建一个消息生产者,并设置消息生产者组
DefaultMQProducer producer = new DefaultMQProducer("your_producer_group");
// 指定NameServer地址
producer.setNamesrvAddr("your_nameserver_address");
// 初始化Producer
producer.start();
// 创建消息对象,并设置主题、标签和消息内容
Message msg = new Message("your_topic", "your_tag", "Hello RocketMQ".getBytes());
// 发送消息并获取发送结果
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
// 关闭生产者实例
producer.shutdown();
}
}
实现消费者
在Java中,通过创建
DefaultMQPushConsumer
对象来实现消息的消费者。消费者需要设置NameServer地址和消费组名称,并调用
subscribe()
方法订阅主题和标签。然后,注册消息监听器来处理接收到的消息。
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建一个消息消费者,并设置消息消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("your_consumer_group");
// 指定NameServer地址
consumer.setNamesrvAddr("your_nameserver_address");
// 订阅指定Topic下的所有消息
consumer.subscribe("your_topic", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("接收到消息: %s%n", new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
}
}
本文只是了解下RocketMQ的基本原理和实现,在后面的章节中,我们会带来RocketMQ的完整解读。