摘要: 消息队列组件RocketMQ的介绍及应用。
7.1 MQ简介 7.1.1 什么是MQ MQ(Message Queue)是一种跨进程的通信机制,用于传递消息。通俗点说,就是一个先进先出的数据结构。
7.1.2 MQ的应用场景 7.1.2.1 异步解耦 最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功。传统的做法如下:
此架构下注册、邮件、短信三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。但是对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便可以登录,而后续的注册短信和邮件不是即时需要关注的步骤。
所以实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的消息队列 MQ 中然后马上返回用户结果,由消息队列 MQ 异步地进行这些操作。架构图如下:
异步解耦是消息队列 MQ 的主要特点,主要目的是减少请求响应时间和解耦。主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果 的操作作为消息放入消息队列。同时,由于使用了消息队列MQ,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦合。
7.1.2.2 流量削峰 流量削峰也是消息队列 MQ 的常用场景,一般在秒杀或团队抢购(高并发)活动中使用广泛。 在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增,秒杀的应用在处理如此大量的访问流量后,下游的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况。为解决这些问题,可在应用和下游通知系统之间加入消息队列 MQ。
秒杀处理流程如下所述:
用户发起海量秒杀请求到秒杀业务处理系统。
秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送至消息队列 MQ。
下游的通知系统订阅消息队列 MQ 的秒杀相关消息,再将秒杀成功的消息发送到相应用户。
用户收到秒杀成功的通知。
7.1.3 常见的MQ产品 目前业界有很多MQ产品,比较出名的有下面这些:
ZeroMQ 号称最快的消息队列系统,尤其针对大吞吐量的需求场景。扩展性好,开发比较灵活,采用C语言实现,实际上只是一个socket库的重新封装,如果做为消息队列使用,需要开发大量的代码。 ZeroMQ仅提供非持久性的队列,也就是说如果down机,数据将会丢失。
RabbitMQ 使用erlang语言开发,性能较好,适合于企业级的开发。但是不利于做二次开发和维护。
ActiveMQ 历史悠久的Apache开源项目。已经在很多产品中得到应用,实现了JMS1.1规范,可以和spring-jms轻松融合,实现了多种协议,支持持久化到数据库,对队列数较多的情况支持不好。
RocketMQ 阿里巴巴的MQ中间件,由java语言开发,性能非常好,能够撑住双十一的大流量,而且使用起来很简单。
Kafka Kafka是Apache下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。
7.2 RocketMQ入门 RocketMQ是阿里巴巴开源的分布式消息中间件,现在是Apache的一个顶级项目。在阿里内部使用非常广泛,已经经过了”双11”这种万亿级的消息流转。 接下来介绍三种安装RocketMQ的方式,根据需要任选其一即可。
7.2.1 环境搭建(linux) 接下来我们先在linux平台下安装一个RocketMQ的服务
7.2.1.1 环境准备 下载RocketMQ
http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
环境要求
Linux 64位操作系统
64bit JDK 1.8+
7.2.1.2 安装RocketMQ 1 上传文件到Linux系统
1 2 [root@centos rocketmq] rocketmq-all-4.4.0-bin-release.zip
2 解压到安装目录
1 2 [root@centos src] [root@centos src]
7.2.1.3 启动RocketMQ 1 切换到安装目录
1 2 [root@centos rocketmq] benchmark bin conf lib LICENSE NOTICE README.md
2 启动NameServer
1 2 3 4 [root@centos rocketmq] [1] 1467 [root@centos rocketmq]
3 启动Broker
1 2 3 4 5 [root@centos rocketmq] [root@centos rocketmq]
7.2.1.4 测试RocketMQ 1 测试消息发送
1 2 3 [root@centos rocketmq] [root@centos rocketmq] org.apache.rocketmq.example.quickstart.Producer
2 测试消息接收
1 2 3 [root@centos rocketmq] [root@centos rocketmq] org.apache.rocketmq.example.quickstart.Consumer
7.2.1.5 关闭RocketMQ 1 2 [root@centos rocketmq] [root@centos rocketmq]
7.2.2 环境搭建(windows) 7.2.2.1 环境准备 下载RocketMQ
http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
选择‘Binary’进行下载。
环境要求
7.2.2.2 安装RocketMQ 1 解压已下载工程到任意安装目录{MQ文件夹}
2 配置系统环境变量
系统环境变量配置 变量名:ROCKETMQ_HOME 变量值:MQ解压路径\MQ文件夹(MQ文件夹全路径)
重启服务器,使环境变量生效。
7.2.2.3 启动RocketMQ 1 启动NameServer
Cmd命令框执行进入至‘MQ文件夹\bin’下,然后执行start mqnamesrv.cmd
,启动NAMESERVER。成功后会弹出提示框,此框勿关闭。
2 启动BROKER
编辑bin/runbroker.cmd
和 bin/runserver.cmd
文件,修改里面的JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
为JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
cmd命令框执行进入至‘MQ文件夹\bin’下,然后执行start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
,启动BROKER。成功后会弹出提示框,此框勿关闭。
假如弹出提示框提示‘错误: 找不到或无法加载主类 xxxxxx’。打开runbroker.cmd,然后将‘%CLASSPATH%’加上英文双引号。保存并重新执行start语句。
7.2.3 环境搭建(docker) 7.2.3.1 安装RocketMQ 拉取镜像foxiswho/rocketmq:4.7.0
1 git pull foxiswho/rocketmq:4.7.0
7.2.3.2 运行RocketMQ 1 运行namesrv
1 2 3 4 5 6 docker run -d -v /rocketMQ/logs:/home/rocketmq/logs \ --name rmqnamesrv \ -e "JAVA_OPT_EXT=-Xms256M -Xmx256M -Xmn128m" \ -p 9876:9876 \ foxiswho/rocketmq:4.7.0 \ sh mqnamesrv
2 运行broker
在/rocketMQ/conf
目录下创建 broker.conf 文件,内容如下:
1 2 3 4 5 6 7 8 brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH brokerIP1 = {docker宿主机IP}
将{docker宿主机IP}
替换成你真实的docker宿主机IP,如果docker是本机,就填127.0.0.1
,否则broker会给出它在容器中的IP。 启动容器:
1 2 3 4 5 6 7 8 docker run -d -v /rocketMQ/logs:/home/rocketmq/logs -v /rocketMQ/store:/home/rocketmq/store \ -v /rocketMQ/conf:/home/rocketmq/conf \ --name rmqbroker \ -e "NAMESRV_ADDR=rmqnamesrv:9876" \ -e "JAVA_OPT_EXT=-Xms256M -Xmx256M -Xmn128m" \ -p 10911:10911 -p 10912:10912 -p 10909:10909 \ foxiswho/rocketmq:4.7.0 \ sh mqbroker -c /home/rocketmq/conf/broker.conf
7.2.4 RocketMQ的架构及概念
如上图所示,整体可以分成4个角色,分别是:NameServer,Broker,Producer,Consumer。
Broker(邮递员) Broker是RocketMQ的核心,负责消息的接收,存储,投递等功能
NameServer(邮局) 消息队列的协调者,Broker向它注册路由信息,同时Producer和Consumer向其获取路由信息
Producer(寄件人) 消息的生产者,需要从NameServer获取Broker信息,然后与Broker建立连接,向Broker发送消息
Consumer(收件人) 消息的消费者,需要从NameServer获取Broker信息,然后与Broker建立连接,从Broker获取消息
Topic(地区) 用来区分不同类型的消息,发送和接收消息前都需要先创建Topic,针对Topic来发送和接收消息
Message Queue(邮件) 为了提高性能和吞吐量,引入了Message Queue,一个Topic可以设置一个或多个Message Queue,这样消息就可以并行往各个Message Queue发送消息,消费者也可以并行的从多个Message Queue读取消息
Message Message 是消息的载体。
Producer Group 生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。
Consumer Group 消费者组,消费同一类消息的多个 consumer 实例组成一个消费者组。
7.2.5 RocketMQ控制台安装 1 下载
1 2 https://github.com/apache/rocketmq-externals/releases
2 修改配置文件
1 2 3 server.port=7777 rocketmq.config.namesrvAddr=192.168.109.131:9876
3 打成jar包,并启动
1 2 3 4 mvn clean package -Dmaven.test.skip=true java -jar target/rocketmq-console-ng-1.0.0.jar
4 访问控制台
7.3 消息发送和接收演示 接下来我们使用Java代码来演示消息的发送和接收
1 2 3 4 5 <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-spring-boot-starter</artifactId > <version > 2.0.2</version > </dependency >
7.3.1 发送消息 消息发送步骤:
创建消息生产者, 指定生产者所属的组名
指定Nameserver地址
启动生产者
创建消息对象,指定主题、标签和消息体
发送消息
关闭生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class RocketMQSendTest { public static void main (String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("myproducer-group" ); producer.setNamesrvAddr("127.0.0.1:9876" ); producer.start(); Message msg = new Message("myTopic" , "myTag" , ("RocketMQ Message" ).getBytes()); SendResult sendResult = producer.send(msg, 10000 ); System.out.println(sendResult); producer.shutdown(); } }
7.3.2 接收消息 消息接收步骤:
创建消息消费者, 指定消费者所属的组名
指定Nameserver地址
指定消费者订阅的主题和标签
设置回调函数,编写处理消息的方法
启动消息消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class RocketMQReceiveTest { public static void main (String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group" ); consumer.setNamesrvAddr("127.0.0.1:9876" ); consumer.subscribe("myTopic" , "*" ); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println("Receive New Messages: " + msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started." ); } }
7.4 案例 接下来我们模拟一种场景: 下单成功之后,向下单用户发送短信。设计图如下:
7.4.1 订单微服务发送消息 1 在shop-order
中添加rocketmq的依赖
1 2 3 4 5 6 7 8 9 10 11 <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-spring-boot-starter</artifactId > <version > 2.0.2</version > </dependency > <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-client</artifactId > <version > 4.4.0</version > </dependency >
2 添加配置
1 2 3 4 rocketmq: name-server: 127.0 .0 .1 :9876 producer: group: shop-order
3 编写测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @RestController @Slf4j public class OrderController { @Autowired private OrderService orderService; @Autowired private ProductService productService; @Autowired private RocketMQTemplate rocketMQTemplate; @GetMapping("/order/prod/{pid}") public Order order (@PathVariable("pid") Integer pid) { log.info(">>客户下单,这时候要调用商品微服务查询商品信息" ); Product product = productService.findByPid(pid); if (product == null ) { Order order = new Order(); order.setPname("下单失败" ); return order; } log.info(">>商品信息,查询结果:" + JSON.toJSONString(product)); Order order = new Order(); order.setUid(1 ); order.setUsername("测试用户" ); order.setPid(product.getPid()); order.setPname(product.getPname()); order.setPprice(product.getPprice()); order.setNumber(1 ); orderService.createOrder(order); rocketMQTemplate.convertAndSend("order-topic" , order); return order; } }
7.4.2 用户微服务订阅消息 1 修改shop-user
模块配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency > <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-spring-boot-starter</artifactId > <version > 2.0.2</version > </dependency > <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-client</artifactId > <version > 4.4.0</version > </dependency >
2 修改主类
1 2 3 4 5 6 7 @SpringBootApplication @EnableDiscoveryClient public class UserApplication { public static void main (String[] args) { SpringApplication.run(UserApplication.class, args); } }
3 修改配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 server: port: 8071 spring: application: name: service-user datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql:///shop?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true username: root password: root jpa: properties: hibernate: hbm2ddl: auto: update dialect: org.hibernate.dialect.MySQL5InnoDBDialect cloud: nacos: discovery: server-addr: 127.0 .0 .1 :8848 rocketmq: name-server: 127.0 .0 .1 :9876
4 编写消息接收服务
1 2 3 4 5 6 7 8 @Slf4j @Service @RocketMQMessageListener(consumerGroup = "shop-user",topic = "order-topic") public class SmsService implements RocketMQListener <Order > { public void onMessage (Order order) { log.info("收到一个订单信息{},接下来发送短信" , JSON.toJSONString(order)); } }
5 启动服务,执行下单操作,观看后台输出
7.5 发送不同类型的消息 7.5.1 普通消息 RocketMQ提供三种方式来发送普通消息:可靠同步发送、可靠异步发送和单向发送。
可靠同步发送
同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。 此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
可靠异步发送
异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。发送方通过回调接口接收服务器响应,并对响应结果进行处理。 异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
单向发送
单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。 适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
1 2 3 4 5 6 7 8 9 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > </dependency > <dependency > <groupId > junit</groupId > <artifactId > junit</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 @RunWith(SpringRunner.class) @SpringBootTest(classes = OrderApplication.class) public class MessageTypeTest { @Autowired private RocketMQTemplate rocketMQTemplate; @Test public void testSyncSend () { SendResult sendResult = rocketMQTemplate.syncSend("test-topic-1" , "这是一条同步消息" ); System.out.println(sendResult); } @Test public void testAsyncSend () throws InterruptedException { rocketMQTemplate.asyncSend("test-topic-1" , "这是一条异步消息" , new SendCallback() { @Override public void onSuccess (SendResult sendResult) { System.out.println(sendResult); } @Override public void onException (Throwable throwable) { System.out.println(throwable); } }); Thread.sleep(30000000 ); } @Test public void testOneWay () { rocketMQTemplate.sendOneWay("test-topic-1" , "这是一条单向消息" ); } }
三种发送方式的对比
发送方式
发送 TPS
发送结果反馈
可靠性
同步发送
快
有
不丢失
异步发送
快
有
不丢失
单向发送
最快
无
可能丢失
7.5.2 顺序消息 顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型。
1 2 3 4 5 public void testSyncSendOrderly () { rocketMQTemplate.syncSendOrderly("test-topic-1" , "这是一条异步顺序消息" ,"xxxx" ); }
7.5.3 事务消息 RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致。
事务消息交互流程:
两个概念:
半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了RocketMQ服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。
事务消息发送步骤:
发送方将半事务消息发送至RocketMQ服务端。
RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。
发送方开始执行本地事务逻辑。
发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。
事务消息回查步骤:
在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。
1 2 3 4 5 6 7 8 @Entity(name = "shop_txlog") @Data public class TxLog { @Id private String txId; private Date date; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Service public class OrderServiceImpl4 { @Autowired private OrderDao orderDao; @Autowired private TxLogDao txLogDao; @Autowired private RocketMQTemplate rocketMQTemplate; public void createOrderBefore (Order order) { String txId = UUID.randomUUID().toString(); rocketMQTemplate.sendMessageInTransaction( "tx_producer_group" , "tx_topic" , MessageBuilder.withPayload(order).setHeader("txId" , txId).build(), order); } @Transactional public void createOrder (String txId,Order order) { orderDao.save(order); TxLog txLog = new TxLog(); txLog.setTxId(txId); txLog.setDate(new Date()); txLogDao.save(txLog); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Service @RocketMQTransactionListener(txProducerGroup = "tx_producer_group") public class OrderServiceImpl4Listener implements RocketMQLocalTransactionListener { @Autowired private OrderServiceImpl4 orderServiceImpl4; @Autowired private TxLogDao txLogDao; @Override public RocketMQLocalTransactionState executeLocalTransaction (Message message, Object o) { String txId = (String) message.getHeaders().get("txId" ); try { Order order = (Order) o; orderServiceImpl4.createOrder(txId,order); return RocketMQLocalTransactionState.COMMIT; } catch (Exception ex) { return RocketMQLocalTransactionState.ROLLBACK; } } @Override public RocketMQLocalTransactionState checkLocalTransaction (Message message) { String txId = (String) message.getHeaders().get("txId" ); TxLog txLog = txLogDao.findById(txId).get(); if (txLog!=null ){ return RocketMQLocalTransactionState.COMMIT; } else { return RocketMQLocalTransactionState.ROLLBACK; } } }
7.6 消息消费要注意的细节 1 2 3 4 5 6 7 @RocketMQMessageListener( consumerGroup = "shop",//消费者分组 topic = "order-topic",//要消费的主题 consumeMode = ConsumeMode.CONCURRENTLY, //消费模式:无序和有序 messageModel = MessageModel.CLUSTERING, //消息模式:广播和集群,默认是集群 ) public class SmsService implements RocketMQListener <Order > {}
RocketMQ支持两种消息模式:
广播消费: 每个消费者实例都会收到消息,也就是一条消息可以被每个消费者实例处理;
集群消费: 一条消息只能被一个消费者实例消费