什么是消息中间件
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)例如:寄快递
消息中间件使用场景
异步处理
场景说明:用户注册后,需要发注册邮件和注册短信
将注册信息写入数据库成功后,发送注册邮件,再发送注册短信,以上三个任务全部完成后,返回给客户端
引入消息队列,改造后的架构如下
按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此架构改变后,系统的吞吐量比串行提高了3倍,比并行提高了2倍
应用解耦
场景说明:用户下单后,订单系统需要通知库存系统,传统的做法是订单系统通过调用库存系统的接口来对库存进行操作
解耦合后:
订单系统:假如在下单时库存系统不能正常使用,也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了,实现订单系统与库存系统的应用解耦
常见消息中间件比较
特性MQ |
ActiveMQ |
RabbitMQ |
RocketMQ |
Kafka |
---|---|---|---|---|
生产者消费者模式 |
支持 |
支持 |
支持 |
支持 |
发布订阅模式 |
支持 |
支持 |
支持 |
支持 |
请求回应模式 |
支持 |
支持 |
不支持 |
不支持 |
Api完备性 |
高 |
高 |
高 |
高 |
多语言支持 |
支持 |
支持 |
java |
支持 |
单机吞吐量 |
万级 |
万级 |
万级 |
十万级 |
消息延迟 |
无 |
微秒级 |
毫秒级 |
毫秒级 |
可用性 |
高(主从) |
高(主从) |
非常高(分布式) |
非常高(分布式) |
消息丢失 |
低 |
低 |
理论上不会丢失 |
理论上不会丢失 |
文档的完备性 |
高 |
高 |
较高 |
高 |
提供快速入门 |
有 |
有 |
有 |
有 |
社区活跃度 |
高 |
高 |
中 |
高 |
商业支持 |
无 |
无 |
商业云 |
商业云 |
安装
解压即安装,注意 jdk 版本为 8
启动
启动broker使用命令,可以开启自动创建topic,否则会报错
建议使用命令启动
start mqnamesrv.cmd
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
autoCreateTopicEnable=true
开启自动创建topic
使用
硬编码方式发送
导入依赖无需配置
org.apache.rocketmq rocketmq-client 4.4.0
发送同步消息
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
- 创建消息生产者, 指定生产者所属的组名
- 指定Nameserver地址
- 启动生产者
- 创建消息对象,指定主题、标签和消息体
- 发送消息
- 关闭生产者
package com.ape.rocketmq.test;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
// 发送同步消息
public class RocketMQSendTest01 {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
//1. 创建消息生产者, 指定生产者所属的组名
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
//2. 指定Nameserver地址
producer.setNamesrvAddr("127.0.0.1:9876");
//3. 启动生产者
producer.start();
for (int i = 0; i
发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。只会等待MQ发送状态
相比于同步消息,就是在发送的时候new一个SendCallback类重写onSuccess以及onException方法
package com.ape.rocketmq.test;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit;
//发送异步消息
//异步发送比较浪费性能,经常会失败,所以发送多几次并且让线程休眠几秒
public class RocketMQSendTest02 {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i
发送单向消息
单向发送消息 类似UPD 只管发不管能不能收到,这种方式主要用在不特别关心发送结果的场景,例如日志发送。
相比于同步消息发送时候没有返回值
package com.ape.rocketmq.test;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit;
// 单向发送消息 类似UPD 只管发不管能不能收到
public class RocketMQSendTest03 {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i
硬编码方式接收
消息接收步骤:
- 创建消息消费者, 指定消费者所属的组名
- 指定Nameserver地址
- 指定消费者订阅的主题和标签
- 设置回调函数,编写处理消息的方法
- 启动消息消费者
package com.ape.rocketmq.test;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
// 接收消息
public class RocketMQReceiveTest01 {
public static void main(String[] args) throws Exception {
// 创建消费者 指定所属组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myConsumer-group");
// 指定NameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 指定订阅生产者的主题和标签
consumer.subscribe("myTopic", "*");
//CLUSTERING-clustering 集群
//BROADCASTING-broadcasting 广播
// consumer.setMessageModel(MessageModel.BROADCASTING);
// 设置回调方法 编写消息处理方法
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println(new String(list.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5. 启动消息消费者
consumer.start();
System.out.println("消费者1已经启动……");
}
}
负载均衡模式(默认模式)
不设置接收模式默认就是负载均衡,轮询
也就是不写consumer.setMessageModel(MessageModel.BROADCASTING);
广播模式
消费者采用广播的方式消费消息,每个消费者(订阅同一个主题的)都能接收到消息,并且每个消费者消费的消息都是相同的
consumer.setMessageModel(MessageModel.BROADCASTING);
集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
与spingboot集成
业务场景:下单成功之后,向下单用户发送短信
添加依赖与配置文件
org.apache.rocketmq rocketmq-spring-boot-starter 2.0.2
生产者配置文件
server:
port: 8091
rocketmq:
name-server: localhost:9876
producer:
group: shop-order
消费者配置文件
server:
port: 8071
rocketmq:
name-server: localhost:9876
项目地址E:CodesIdea_java_worksapesourcespringboot微服务springboot_rocketmq02
结构如下
生产者
package com.apesource.shoporder.controller;
import com.apesource.shopcommon.pojo.Order;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class OrderController {
@Autowired(required = false)
private RocketMQTemplate template;
@RequestMapping("/order/prod/{pid}")
public Order order(Integer pid){
// 下单创建订单 根据商品id查询数据库的到信息赋值给order 这里直接新建order模拟
Order order = new Order();
order.setUid(1);
order.setUsername("测试用户");
order.setPid(pid);
order.setPname("大豫竹");
order.setPprice(2.0);
order.setNumber(1);
// 下订单 给数据库的order表新增一行数据 这里输出来模拟
System.out.println(order);
// 下单成功 给用户发短信 这里直接发送order对象来模拟
template.convertAndSend("order-topic",order);
return order;
}
}
消费者监听器
package com.apesource.shopuser.listener;
import com.apesource.shopcommon.pojo.Order;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
//接收信息并且发送短信给用户
@Component
@RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")
public class SmsListener implements RocketMQListener {
@Override
public void onMessage(Order order) {
System.out.println(order);
}
}
按照逻辑前端发送请求
order服务会打印order对象,user服务也会打印order对象
order服务
user服务
评论(0)