消息中間件 二 之AMQP實戰(下) RabbitMQ springboot實踐

實現內容

RabbitMQ + springboot 實現消息的發送和監聽 springboot版本2.1.8spring

直接上代碼

配置類springboot

@Configuration
public class RabbitConfig {

    // mq地址
    @Bean(value = "connectionFactory")
    @Primary
    public ConnectionFactory connectionFactory(
            @Value("${spring.rabbitmq.host}") String host,
            @Value("${spring.rabbitmq.port}") int port,
            @Value("${spring.rabbitmq.username}") String username,
            @Value("${spring.rabbitmq.password}") String password,
            @Value("${spring.rabbitmq.virtual-host}") String virtualHost) {

        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);

        return connectionFactory;
    }
    @Bean
    public RetryTemplate retryTemplate() {

        RetryTemplate retryTemplate = new RetryTemplate();
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(500);
        backOffPolicy.setMultiplier(10.0);
        backOffPolicy.setMaxInterval(10000);
        retryTemplate.setBackOffPolicy(backOffPolicy);

        return retryTemplate;
    }

    // mq發送
    @Bean
    public AmqpTemplate myMQTemplate(@Qualifier("connectionFactory") ConnectionFactory connectionFactory) {

        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setRetryTemplate(retryTemplate());
        template.setMessageConverter(new Jackson2JsonMessageConverter());

        return template;
    }
}

MQ生產者服務器

@Component
public class MQSender {

    @Autowired
    private AmqpTemplate myMQTemplate;

    public void send(String exchangeName, String routingKey, Object object) {

        myMQTemplate.convertAndSend(exchangeName, routingKey, object);
    }
}

須要注意, 這裏沒有生成exchange, 須要手動建立, 若是須要程序自動建立, 則須要將exchange聲明爲bean便可. 執行測試代碼測試

@Autowired
private MQSender mqSender;

@Test
public void send() {

    mqSender.send("e.send", "r.send", "send a message");
}

若是你觀察的及時, 估計還能看見exchange收到消息的曲線波動~ 由於沒有消費者, 因此exchange在接收到信息後直接將消息丟棄了, 如今咱們建立對應的隊列, 並綁定. 再次執行就能夠看到隊列中有消息了. code

下面是消費者 首先咱們要在配置類中增長監聽配置, 一個自動ack, 一個手動ackblog

// 自動ack
@Bean(value = "listenerFactoryWithAutoAck")
    public SimpleRabbitListenerContainerFactory listenerFactoryWithAutoAck(@Qualifier("connectionFactory") ConnectionFactory newRentConnectionFactory) {

        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(newRentConnectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
// 手動ack
@Bean(value = "listenerFactoryWithManualAck")
public SimpleRabbitListenerContainerFactory listenerFactoryWithManualAck(@Qualifier("connectionFactory") ConnectionFactory newRentConnectionFactory) {

        SimpleRabbitListenerContainerFactory factory = listenerFactory(newRentConnectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

若是你使用的springboot版本是2.2.*的話, 能夠更方便的在監聽的註解上設置ackMode, 而且會覆蓋在配置監聽工廠的配置方式, 這裏使用的版本是2.1.8因此只能在配置工廠時設置.rabbitmq

@Component
public class MQListener {

    @RabbitListener(containerFactory = "listenerFactoryWithManualAck",
            queues = "q.send")
    public void consume(Message message, Channel channel,
                        @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {

        System.out.println("message: " + message.toString());

        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println("message body: " + msg);
    }
}

執行後看到消息並無被消費, 這是由於咱們使用手動應答監聽, 可是沒有發送應答, 服務器將消息從新入隊列. 在監聽中加入代碼, false表示只是響應這條信息, true表示全部信息. channel.basicAck(tag, false);隊列

啓動程序能夠看到信息被消費了. 在程序拋異常時, 可能須要手動處理異常, 拒絕消息. true表示消息從新入隊列, 還能夠被消費; false表示直接丟棄消息 channel.basicReject(tag, true);ip

以上就是RabbitMQ在springboot中的簡單實用get