集成RabbitMQ+MQ常用操作
文章目录1.环境搭建1.Docker安装RabbitMQ1.拉取镜像2.安装命令3.开启5672和15672端口4.登录控制台2.整合Spring AMQP1.sun-common模块下创建新模块2.引入amqp依赖和fastjson3.新建一个mq-demo的模块1.在sun-frame下创建mq-demo2.然后在mq-demo下创建生产者和消费者子模块3.查看是否交给父模块管理了4.在mq-demo模块引入sun-common-rabbitmq依赖5.publisher引入sun-common-test依赖6.将sun-common-rabbitmq clean-install一下7.给consumer和publisher都创建主类1.ConsumerApplication.java2.PublisherApplication.java4.测试MQ1.application.yml mq的最基本配置2.consumer1.TestConfig.java MQ配置2.TestConfigListener.java 监听队列3.publisher1.TestConfig.java 测试(注意指定启动类)2.结果2.基本交换机1.Fanout1.FanoutConfig.java 交换机配置2.FanoutConfigListener.java 监听者3.FanoutConfig.java 生产者2.Direct1.DirectConfig.java 交换机配置2.DirectConfigListener.java 监听者3.DirectConfig.java 生产者1.环境搭建1.Docker安装RabbitMQ1.拉取镜像docker pull rabbitmq:3.8-management2.安装命令docker run -e RABBITMQ_DEFAULT_USERsun -e RABBITMQ_DEFAULT_PASSmq -v mq-plugins:/plugins --name mq --hostname mq -p 15672:15672 -p 5672:5672 -d 699038cb2b96 # 注意这里是镜像id需要替换3.开启5672和15672端口4.登录控制台15672端口2.整合Spring AMQP1.sun-common模块下创建新模块2.引入amqp依赖和fastjson?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instance xsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd modelVersion4.0.0/modelVersion !-- 继承父模块的版本和通用依赖 -- parent groupIdcom.sunxiansheng/groupId artifactIdsun-common/artifactId version1.0-SNAPSHOT/version /parent artifactIdsun-common-rabbitmq/artifactId !-- 子模块的version,如果不写就默认跟父模块的一样 -- version${children.version}/version !-- 自定义依赖,无需版本号 -- dependencies !--AMQP依赖包含RabbitMQ-- dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-amqp/artifactId /dependency !-- 用于传递消息时的序列化操作 -- dependency groupIdcom.alibaba/groupId artifactIdfastjson/artifactId /dependency /dependencies /project3.新建一个mq-demo的模块1.在sun-frame下创建mq-demo2.然后在mq-demo下创建生产者和消费者子模块3.查看是否交给父模块管理了4.在mq-demo模块引入sun-common-rabbitmq依赖dependencies !-- 引入sun-common-rabbitmq -- dependency groupIdcom.sunxiansheng/groupId artifactIdsun-common-rabbitmq/artifactId version1.0-SNAPSHOT/version /dependency /dependencies5.publisher引入sun-common-test依赖dependencies !-- sun-common-test -- dependency groupIdcom.sunxiansheng/groupId artifactIdsun-common-test/artifactId version1.0-SNAPSHOT/version /dependency /dependencies6.将sun-common-rabbitmq clean-install一下7.给consumer和publisher都创建主类1.ConsumerApplication.javapackage com.sunxiansheng.consumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; SpringBootApplication ComponentScan(com.sunxiansheng) public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } }2.PublisherApplication.javapackage com.sunxiansheng.publisher; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; SpringBootApplication public class PublisherApplication { public static void main(String[] args) { SpringApplication.run(PublisherApplication.class); } }4.测试MQ1.application.yml mq的最基本配置spring: # RabbitMQ 配置 rabbitmq: # 服务器地址 host: ip # 用户名 username: sunxiansheng # 密码 password: rabbitmq # 虚拟主机 virtual-host: / # 端口 port: 56722.consumer1.TestConfig.java MQ配置package com.sunxiansheng.consumer.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Description: 最基本的MQ测试 * Author sun * Create 2024/8/2 14:34 * Version 1.0 */ Configuration public class TestConfig { /** * 创建一个fanout类型的交换机 * return */ Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(fanout.exchange.test); } /** * 创建一个队列 * return */ Bean public Queue fanoutQueueTest() { return new Queue(fanout.queue.test); } /** * 交换机和队列绑定 */ Bean public Binding binding() { return BindingBuilder.bind(fanoutQueueTest()).to(fanoutExchange()); } }2.TestConfigListener.java 监听队列package com.sunxiansheng.consumer.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * Description: 最基本的MQ测试 * Author sun * Create 2024/8/2 14:34 * Version 1.0 */ Component public class TestConfigListener { RabbitListener(queues fanout.queue.test) public void receive(String message) { System.out.println(接收到的消息 message); } }3.publisher1.TestConfig.java 测试(注意指定启动类)package com.sunxiansheng.consumer.config; import com.sunxiansheng.publisher.PublisherApplication; import org.junit.jupiter.api.Test; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; /** * Description: 最基本的MQ测试 * Author sun * Create 2024/8/2 14:34 * Version 1.0 */ SpringBootTest(classes PublisherApplication.class) // 指定启动类 public class TestConfig { Resource private AmqpTemplate amqpTemplate; Test public void send() { // 交换机 String exchange fanout.exchange.test; // 路由键 String routingKey ; // 消息 String message hello fanout; // 发送消息 amqpTemplate.convertAndSend(exchange, routingKey, message); } }2.结果2.基本交换机1.Fanout1.FanoutConfig.java 交换机配置package com.sunxiansheng.consumer.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Description: Fanout交换机 * Author sun * Create 2024/7/29 15:06 * Version 1.0 */ Configuration public class FanoutConfig { Bean public FanoutExchange fanoutExchange1() { // 创建一个fanout类型的交换机 return new FanoutExchange(fanout.exchange); } Bean public Queue fanoutQueue1() { // 创建一个队列 return new Queue(fanout.queue1); } Bean public Queue fanoutQueue2() { // 创建一个队列 return new Queue(fanout.queue2); } // 两个队列绑定到交换机上 Bean public Binding bindingFanoutQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange1) { return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange1); } Bean public Binding bindingFanoutQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange1) { return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange1); } }2.FanoutConfigListener.java 监听者package com.sunxiansheng.consumer.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * Description: Fanout交换机 * Author sun * Create 2024/7/29 15:06 * Version 1.0 */ Component public class FanoutConfigListener { RabbitListener(queues fanout.queue1) public void receive1(String message) { System.out.println(fanout.queue1接收到的消息 message); } RabbitListener(queues fanout.queue2) public void receive2(String message) { System.out.println(fanout.queue2接收到的消息 message); } }3.FanoutConfig.java 生产者package com.sunxiansheng.consumer.config; import com.sunxiansheng.publisher.PublisherApplication; import org.junit.jupiter.api.Test; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; /** * Description: Fanout交换机 * Author sun * Create 2024/7/29 15:06 * Version 1.0 */ SpringBootTest(classes PublisherApplication.class) // 指定启动类 public class FanoutConfig { Resource private AmqpTemplate amqpTemplate; Test public void send() { // 交换机 String exchange fanout.exchange; // 路由键 String routingKey ; // 消息 String message hello fanout; // 发送消息 amqpTemplate.convertAndSend(exchange, routingKey, message); } }2.Direct1.DirectConfig.java 交换机配置package com.sunxiansheng.consumer.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Description: Direct交换机 * Author sun * Create 2024/7/29 15:06 * Version 1.0 */ Configuration public class DirectConfig { Bean public DirectExchange directExchange() { // 创建一个direct类型的交换机 return new DirectExchange(direct.exchange); } Bean public Queue directQueue1() { // 创建一个队列 return new Queue(direct.queue1); } Bean public Queue directQueue2() { // 创建一个队列 return new Queue(direct.queue2); } // 两个队列绑定到交换机上这里需要指定routingKey Bean public Binding bindingDirectQueue1(Queue directQueue1, DirectExchange directExchange) { return BindingBuilder.bind(directQueue1).to(directExchange).with(black); } Bean public Binding bindingDirectQueue2(Queue directQueue2, DirectExchange directExchange) { return BindingBuilder.bind(directQueue2).to(directExchange).with(green); } }2.DirectConfigListener.java 监听者package com.sunxiansheng.consumer.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * Description: Direct交换机 * Author sun * Create 2024/7/29 15:06 * Version 1.0 */ Component public class DirectConfigListener { RabbitListener(queues direct.queue1) public void receive1(String message) { System.out.println(direct.queue1接收到的消息 message); } RabbitListener(queues direct.queue2) public void receive2(String message) { System.out.println(direct.queue2接收到的消息 message); } }3.DirectConfig.java 生产者package com.sunxiansheng.consumer.config; import com.sunxiansheng.publisher.PublisherApplication; import org.junit.jupiter.api.Test; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; /** * Description: Direct交换机 * Author sun * Create 2024/7/29 15:06 * Version 1.0 */ SpringBootTest(classes PublisherApplication.class) // 指定启动类 public class DirectConfig { Resource private AmqpTemplate amqpTemplate; Test public void send() { // 交换机 String exchange direct.exchange; // 路由键 String routingKey black; // 消息 String message hello direct; // 发送消息 amqpTemplate.convertAndSend(exchange, routingKey, message); } }