消息中间件的产生的背景
在网络通讯中,Http请求默认采用同步请求方式,基于请求与响应模式,在客户端与服务器进行通讯时,客户端调用服务端接口后,必须等待服务端完成处理后返回结果给客户端才能继续执行,这种情况属于同步调用方式。如果服务器端发生网络延迟、不可达的情况,可能客户端也会受到影响
1.什么是消息中间件
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)例如:寄快递
2.消息中间件使用场景
2.1 异步处理
场景说明:用户注册后,需要发注册邮件和注册短信
将注册信息写入数据库成功后,发送注册邮件,再发送注册短信,以上三个任务全部完成后,返回给客户端
引入消息队列,改造后的架构如下
按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此架构改变后,系统的吞吐量比串行提高了3倍,比并行提高了2倍
2.2应用解耦
场景说明:用户下单后,订单系统需要通知库存系统,传统的做法是订单系统调用库存系统的接口
解耦合后:
订单系统:假如在下单时库存系统不能正常使用,也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了,实现订单系统与库存系统的应用解耦
3.常见消息中间件比较
特性MQ |
ActiveMQ |
RabbitMQ |
RocketMQ |
Kafka |
生产者消费者模式 |
支持 |
支持 |
支持 |
支持 |
发布订阅模式 |
支持 |
支持 |
支持 |
支持 |
请求回应模式 |
支持 |
支持 |
不支持 |
不支持 |
Api完备性 |
高 |
高 |
高 |
高 |
多语言支持 |
支持 |
支持 |
java |
支持 |
单机吞吐量 |
万级 |
万级 |
万级 |
十万级 |
消息延迟 |
无 |
微秒级 |
毫秒级 |
毫秒级 |
可用性 |
高(主从) |
高(主从) |
非常高(分布式) |
非常高(分布式) |
消息丢失 |
低 |
低 |
理论上不会丢失 |
理论上不会丢失 |
文档的完备性 |
高 |
高 |
较高 |
高 |
提供快速入门 |
有 |
有 |
有 |
有 |
社区活跃度 |
高 |
高 |
中 |
高 |
商业支持 |
无 |
无 |
商业云 |
商业云 |
4.RocketMQ
RocketMQ是阿里巴巴开源的分布式消息中间件,现在是Apache的一个顶级项目。在阿里内部使用非常广泛,已经经过了”双11″这种万亿级的应用场景考验。
4.1 环境准备
下载RocketMQ
http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
环境要求
- 64位操作系统
- JDK 1.8+
- 安装Maven
4.2 安装RocketMQ
- 解压缩安装包
- 配置环境变量
-
- 变量名:ROCKETMQ_HOME 变量值:MQ解压缩路径
- 编辑: path %ROCKETMQ_HOME%bin
4.3 启动RocketMQ
- 切换到安装目录
-
- rocketmq的bin目录下
- 启动NameServer
-
- start mqnamesrv.cmd
- 启动Broker
-
- start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
- 如果弹出框提示‘错误: 找不到或无法加载主类 xxxxxx’。在bin下找到并打开runbroker.cmd,然后将‘%CLASSPATH%’加上英文双引号
4.4 安装可视化插件
- github下载
- rocketmq-externals-rocketmq-console-1.0.0.zip解压压缩包
- 进入rocketmq-consolesrcmainresources文件加,用编辑器打开application.properties文件
- 进入rocketmq-externalsrocketmq-console 文件夹,执行:mvn clean package -Dmaven.test.skip=true,编译生成jar包
- 编译成功后,在rocketmq-externalsrocketmq-console下会生成target文件夹
- 进入target后执行:java -jar rocketmq-console-ng-1.0.0.jar,这里是在启动jar工程。启动完毕后,在浏览器输入:http://localhost:8085/进入控制台
5.RocketMQ的架构及概念
如上图所示,整体可以分成4个角色,分别是:NameServer,Broker,Producer,Consumer
- Broker(邮局,邮递员)
-
- Broker是RocketMQ的核心,负责消息的接收,存储,投递等功能
- NameServer(各个邮局的管理机构)
-
- 消息队列的协调者,Broker向它注册路由信息,同时Producer和Consumer向其获取路由信息
- Producer(寄件人)
-
- 消息的生产者,需要从NameServer获取Broker信息,然后与Broker建立连接,向Broker发送消息
- Consumer(收件人)
-
- 消息的消费者,需要从NameServer获取Broker信息,然后与Broker建立连接,从Broker获取消息
- Topic(地区)
-
- 用来区分不同类型的消息,发送和接收消息前都需要先创建Topic,针对Topic来发送和接收消息
- Message Queue
-
- 为了提高性能和吞吐量,引入了Message Queue,一个Topic可以设置一个或多个Message Queue,这样消息就可以并行往各个Message Queue发送消息,消费者也可以并行的从多个 Message Queue读取消息
- Message
-
- Message 是消息的载体。
6. 消息发送接受
org.apache.rocketmq
rocketmq-client
4.4.0
6.1 发送同步消息
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。消息发送步骤:
1. 创建消息生产者, 指定生产者所属的组名
2. 指定Nameserver地址
3. 启动生产者
4. 创建消息对象,指定主题、标签和消息体
5. 发送消息
6. 关闭生产者
//发送消息
public class RocketMQSendTest {
public static void main(String[] args) throws Exception {
//1. 创建消息生产者, 指定生产者所属的组名
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
//2. 指定Nameserver地址
producer.setNamesrvAddr("192.168.109.131:9876");
//3. 启动生产者
producer.start();
//4. 创建消息对象,指定主题、标签和消息体
Message msg = new Message("myTopic", "myTag", ("RocketMQ
Message").getBytes());
//5. 发送消息
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
//6. 关闭生产者
producer.shutdown();
}
}
6.2 接收消息
消息接收步骤:
1. 创建消息消费者, 指定消费者所属的组名
2. 指定Nameserver地址
3. 指定消费者订阅的主题和标签
4. 设置回调函数,编写处理消息的方法
5. 启动消息消费者
//接收消息
public class RocketMQReceiveTest {
public static void main(String[] args) throws MQClientException {
//1. 创建消息消费者, 指定消费者所属的组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumergroup");
//2. 指定Nameserver地址
consumer.setNamesrvAddr("192.168.109.131:9876");
//3. 指定消费者订阅的主题和标签
consumer.subscribe("myTopic", "*");
//4. 设置回调函数,编写处理消息的方法
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) {
System.out.println("Receive New Messages: " + msgs);
//返回消费状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5. 启动消息消费者
consumer.start();
System.out.println("Consumer Started.");
}
}
6.3 发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。只会等待MQ发送状态
//1. 创建消息生产者, 指定生产者所属的组名
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
//2. 指定Nameserver地址
producer.setNamesrvAddr("127.0.0.1:9876");
//3. 启动生产者
producer.start();
for (int i = 0;i
6.4 单向发送消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
//1. 创建消息生产者, 指定生产者所属的组名
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
//2. 指定Nameserver地址
producer.setNamesrvAddr("127.0.0.1:9876");
//3. 启动生产者
producer.start();
for (int i = 0;i
6.5消费消息
1. 负载均衡模式(默认方式)
消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同
2. 广播模式
消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的
//广播模式消费
consumer.setMessageModel(MessageModel.BROADCASTING);
7 使用场景
接下来我们模拟一种场景: 下单成功之后,向下单用户发送短信
7.1 订单微服务发送消息
1 在 shop-order服务中添加rocketmq的依赖
org.apache.rocketmq rocketmq-spring-boot-starter 2.0.2
2 添加配置
rocketmq:
name-server: 127.0.0.1:9876 #rocketMQ服务的地址
producer:
group: shop-order #生产者组
3 编写测试代码
7.2 用户微服务订阅消息
1 修改 shop-user 模块配置
org.apache.rocketmq rocketmq-spring-boot-starter 2.0.2
2 修改置文件
rocketmq:
name-server: 127.0.0.1:9876
3 编写消息接收服务
//发送短信的服务
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")
public class SmsService implements RocketMQListener {
@Override
public void onMessage(Order order) {
log.info("收到一个订单信息{},接下来发送短信", JSON.toJSONString(order));
}
}
评论(0)