SpringBoot整合RabbitMQ
SpringBoot与RabbitMQ集成非常筒単,不需要做任何的额外设置只需要两步即可:
step1:引入相关依赖:spring-boot-starter-amqp
step2:対application.properties迸行配置

生产端核心配置

消费端核心配置

SpringBoot整合RabbitMQ实战
1.首先创建一个Spring Boot工程,这里使用Spring Tool Suite工具,选择导航菜单File --> New --> Spring
Starter Project

2.添加依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
然后复制一份工程,重命名为rabbitmq-springboot-consumer,修改pom相关artifactId,name及description

生产端工程
添加生产端配置

# rabbitmq连接基本配置
spring.rabbitmq.addresses=192.168.0.113:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

# 开启confirm机制
spring.rabbitmq.publisher-confirms=true
# 开启return模式
spring.rabbitmq.publisher-returns=true
# 配合return机制使用,表示接收路由不可达的消息
spring.rabbitmq.template.mandatory=true
创建配置类

@Configuration
@ComponentScan({"com.rxy.springboot.*"})
public class MainConfig {

}
消息的confirm和return机制
publisher-confirms,实现一个监听器用于监听Broker端给我们返回的确认请求:RabbitTemplate.ConfirmCallback

publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功:RabbitTemplate.ReturnCallback
注意一点,在发送消息的时候对template进行配置mandatory=true保证监听有效
生产端还可以配置其他属性,比如发送重试,超时时间、次数、间隔等
创建生产端处理类

import java.util.Map;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class RabbitSender {

    //自动注入RabbitTemplate模板类
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //确认机制
    final ConfirmCallback confirmCallback = new
RabbitTemplate.ConfirmCallback() {
        /**
         * correlationData: 回调的相关数据,包含了消息ID
         * ack: ack结果,true代表ack,false代表nack
         * cause: 如果为nack,返回原因,否则为null
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack,
String cause) {
            System.err.println("correlationData: " + correlationData);
            System.err.println("ack: " + ack);
            if(!ack){
                //做一些补偿机制等
                System.err.println("异常处理....");
            }
        }
    };
    //返回机制
    final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback()
{
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message
message, int replyCode, String replyText, 
                                    String exchange, String routingKey) {
            System.err.println("return exchange: " + exchange + ",
routingKey: " 
                    + routingKey + ", replyCode: " + replyCode + ",
replyText: " + replyText);
        }
    };
    
    //发送消息方法调用: 构建Message消息
    public void send(Object message, Map<String, Object> properties) throws
Exception {
        MessageHeaders messageHeaders = new MessageHeaders(properties);
        //注意导包
        Message msg = MessageBuilder.createMessage(message, messageHeaders);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 时间戳 ,保证全局唯一 ,这个是实际消息的ID
        //在做补偿性机制的时候通过ID来获取到这条消息进行重发
        String id = "1234567890";
        CorrelationData correlationData = new CorrelationData(id);
        //exchange, routingKey, object, correlationData
        rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg,
correlationData);
    }
}
在管控台创建topic交换机exchange-1和队列queue-1,并建立绑定关系为springboot.#
测试方法

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqSpringbootProducerApplicationTests {

    @Test
    public void contextLoads() {
    }

    @Autowired
    private RabbitSender rabbitSender;

    private static SimpleDateFormat simpleDateFormat = new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    
    @Test
    public void testSender1() throws Exception {
         Map<String, Object> properties = new HashMap<>();
         properties.put("number", "12345");
         properties.put("send_time", simpleDateFormat.format(new Date()));
         rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);
    }
}
运行测试方法,打印如下内容

correlationData: CorrelationData [id=1234567890]
ack: true
将发送消息convertAndSend的routingKey修改为spring.hello,再次运行测试方法,打印如下内容

return exchange: exchange-1, routingKey: spring.abc, replyCode: 312,
replyText: NO_ROUTE
correlationData: CorrelationData [id=1234567890]
ack: true
消费端工程
首先配置手工确认模式,用于ACK的手工处理,这样我们可以保证消息的可靠性送达,或者在消费端消费失败的时候可以做到重回队列(不推荐)、根据业务记录日志等处理
可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况
@RabbitMQListener注解
消费端监听@RabbitMQListener注解,这个在实际工作中非常的好用。
@RabbitListener是一个组合注解,里面可以注解配置:
@QueueBinding、@Queue、 @Exchange直接通过这个组合注解一次性搞定消费端交换机、 队列、绑定、路由、并且配置监听功能等
由于类配置写在代码里非常不友好,所以强烈建议大家使用配置文件配置

消费端配置

spring.rabbitmq.addresses=192.168.0.113:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

# 设置签收模式:AUTO(自动签收)、MANUAL(手工签收)、NONE(不签收,没有任何操作)
spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL
# 设置当前消费者数量(线程数)
spring.rabbitmq.listener.simple.concurrency=5
# 设置消费者最大并发数量
spring.rabbitmq.listener.simple.max-concurrency=10
消费端消息处理类

import java.util.Map;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;
import com.rxy.springboot.entity.Order;

@Component
public class RabbitReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue-1", 
            durable="true"),
            exchange = @Exchange(value = "exchange-1", 
            durable="true", 
            type= "topic", 
            ignoreDeclarationExceptions = "true"),
            key = "springboot.*"
            )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端Payload: " + message.getPayload());
        Long deliveryTag =
(Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }
}
前面生产端已经发送了一条消息到queue-1队列,可以再运行发送一条,然后运行消费端工程启动类Application,打印如下

--------------------------------------
消费端Payload: Hello RabbitMQ For Spring Boot!
--------------------------------------
消费端Payload: Hello RabbitMQ For Spring Boot!
其实@RabbitListener会自动声明队列、交换机及绑定关系,可以在管控台删除对应的队列和交换机,然后重新运行进行测试

使用配置方式
将队列、交换机、绑定关系使用配置方式,并且消息体内容使用java对象

首先增加相关的配置
spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*
另外两个工程都增加实体类Order,注意要发送java对象,必须实现序列化接口

public class Order implements Serializable {
    private String id;
    private String name;
}
消费类增加Order对象消息的处理方法
@Payload: 接收消息的消息体对象
@Headers: 接收消息的属性
AmqpHeaders: 抽象类,里面包含了消息的常用属性key
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value =
"${spring.rabbitmq.listener.order.queue.name}", 
            durable="${spring.rabbitmq.listener.order.queue.durable}"),
            exchange = @Exchange(value =
"${spring.rabbitmq.listener.order.exchange.name}", 
            durable="${spring.rabbitmq.listener.order.exchange.durable}", 
            type= "${spring.rabbitmq.listener.order.exchange.type}", 
            ignoreDeclarationExceptions =
"${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
            key = "${spring.rabbitmq.listener.order.key}"
            )
    )
    @RabbitHandler
    public void onOrderMessage(@Payload Order order, 
            Channel channel, 
            @Headers Map<String, Object> headers) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端order: " + order.getId());
        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }
生产端发送方法,直接发送java对象

    //发送消息方法调用: 构建自定义对象消息
    public void sendOrder(Order order) throws Exception {
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 时间戳 全局唯一 
        CorrelationData correlationData = new CorrelationData("0987654321");
        rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order,
correlationData);
    }
生产端测试方法

    @Test
    public void testSender2() throws Exception {
         Order order = new Order("001", "第一个订单");
         rabbitSender.sendOrder(order);
    }
运行说明
先启动消费端,然后启动生产端

# 生产端打印
correlationData: CorrelationData [id=0987654321]
ack: true
# 消费端打印
--------------------------------------
消费端order: 001
 

技术
下载桌面版
GitHub
百度网盘(提取码:draw)
Gitee
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:766591547
关注微信