RabbitMQ如何高效的消費消息

在上篇介紹瞭如何簡單的發送一個消息隊列以後,咱們本篇來看下RabbitMQ的另一種模式,工做隊列ide

什麼是工做隊列

咱們上篇文章說的是,一個生產者生產了消息被一個消費者消費了,以下圖學習

上面這種簡單的消息隊列確實能夠處理咱們的任務,可是當咱們隊列中的任務過多,處理每條任務有須要很長的耗時,那麼使用一個消費者處理消息顯然不不夠的,因此咱們能夠增長消費者,來共享消息隊列中的消息,進行任務處理。spa

也就是以下圖code

雖然上圖我只花了一個生產者A,那麼同理,能有多個消費者,那也能多個生產者。隊列

代碼

發送消息

public class Send {

    public static final String QUEUE_NAME = "test_word_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        // 獲取鏈接
        Connection connection = MQConnectUtil.getConnection();

        // 建立通道
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 模擬發送20條消息
        for (int i = 0; i < 20; i++) {

            String msg = "消息:" + i;

            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

            Thread.sleep(i * 20);

            System.out.println(msg);
        }

        channel.close();
        connection.close();
    }
}

消費者A

public class Consumer1 {

    public static final String QUEUE_NAME = "test_word_queue";

    public static void main(String[] args) throws Exception {

        // 獲取鏈接
        Connection connection = MQConnectUtil.getConnection();

        // 建立頻道
        Channel channel = connection.createChannel();

        // 隊列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定義消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                String msg = new String(body, StandardCharsets.UTF_8);

                System.out.println("消費者[A]-內容:" + msg);

                Thread.sleep(2 * 1000);
            }
        };

        // 監聽隊列
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

消費者B

public class Consumer2 {

    public static final String QUEUE_NAME = "test_word_queue";

    public static void main(String[] args) throws Exception {

        // 獲取鏈接
        Connection connection = MQConnectUtil.getConnection();

        // 建立頻道
        Channel channel = connection.createChannel();

        // 隊列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定義消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                String msg = new String(body, StandardCharsets.UTF_8);

                System.out.println("消費者[B]-內容:" + msg);

                Thread.sleep(1000);
            }
        };

        // 監聽隊列
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

咱們來看下消費者A和消費者B的消費狀況rem

  • 消費者B

  • 消費者A

有沒有發現什麼問題,我總過模擬發送了20條消息,細心的同窗能夠發現,消費者A和消費者B消費了一樣多的消息,都消費了10天,可是我在消費者A和消費者B中,什麼sleep不通的時長,按道理說消費者B要比消費者A處理消息的速度塊,處理的消息更多,那麼爲何會產生這樣的緣由?get

RabbitMQ工做隊列的默認配置

默認狀況下,RabbitMQ會將每一個消息依次發送給下一個消費者,每一個消費者收到的消息數量實際上是同樣的,咱們把這種分發消息的方式稱爲輪訓分發模式消息隊列

本篇咱們就簡單介紹這麼多內容,有心學習的童鞋必定要敲敲代碼,看不必定能看會,只有本身敲一遍,纔能有所理解。it

更多內容請關注:io