RabbitMQ - 消息預取

RabbitMQ - 消息確認這篇文章中,提到了消息預取,避免了rabbitmq一直往消費端發送數據,致使消費端出現無限制的緩衝區問題。消息預取定義了信道上或者消費者容許的最大未確認的消息數量。一旦未確認數達到了設置的值,RabbitMQ將中止傳遞更多消息,除非至少有一條未完成的消息獲得確認。
使用消息預取的時候,會調用chanel的basicQos方法,prefetchCount是未確認的消息數,global默認值爲false,是限制消費者未確認的消息數,設置爲true的時候,是限制信道上的未確認消息數。web

void basicQos(int prefetchCount, boolean global) throws IOException;

消費者限制

global設置爲false,當每一個消費者有2個未確認的消息時,不能再發消息給消費者。segmentfault

public static void main(String[] args) throws IOException, TimeoutException {
    // 聲明一個鏈接工廠
    ConnectionFactory factory = new ConnectionFactory();
    // 建立一個與rabbitmq服務器的鏈接
    Connection connection = factory.newConnection();
    // 建立一個Channel
    Channel channel = connection.createChannel();
    // 經過Channel定義隊列
    channel.queueDeclare("qos", false, false, false, null);
    // 異步回調處理
    DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("deliverCallback1 Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag());
    };
   /* DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("deliverCallback2 Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag());
    };*/
    channel.basicQos(2, false);
    // 接收消息
    channel.basicConsume("qos", false, deliverCallback1, consumerTag -> {
    });
    /*channel.basicConsume("qos", false, deliverCallback2, consumerTag -> {
    });*/
}

運行後,往隊列發送了4條消息,能夠看到,未發送(ready)有2個,未確認2個。
image.png
控制檯確實只收到了兩個消息。
image.png
若是把註釋放開,同時有兩個消費者,能夠看到,未發送(ready)有0個,未確認4個。
image.png
控制檯結果以下,兩個都消費了兩個。
image.png服務器

信道限制

把上面兩個消費者的global改成true,改成信道限制的方式。異步

public static void main(String[] args) throws IOException, TimeoutException {
    // 聲明一個鏈接工廠
    ConnectionFactory factory = new ConnectionFactory();
    // 建立一個與rabbitmq服務器的鏈接
    Connection connection = factory.newConnection();
    // 建立一個Channel
    Channel channel = connection.createChannel();
    // 經過Channel定義隊列
    channel.queueDeclare("qos", false, false, false, null);
    // 異步回調處理
    DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("deliverCallback1 Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag());
    };
    DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("deliverCallback2 Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag());
    };
    channel.basicQos(2, true);
    // 接收消息
    channel.basicConsume("qos", false, deliverCallback1, consumerTag -> {
    });
    channel.basicConsume("qos", false, deliverCallback2, consumerTag -> {
    });
}

能夠看到,未發送(ready)有2個,未確認2個。
image.png
每一個消費者都只消費了一個。由於此時,信道上未確認的消息數是2。
image.pngfetch

混合模式

即設置了信道限制又設置了消費者限制,那結果是怎麼樣的呢?
先設置消費端只能有2個未確認的消息,通道只能有3個未確認的消息。spa

public static void main(String[] args) throws IOException, TimeoutException {
    // 聲明一個鏈接工廠
    ConnectionFactory factory = new ConnectionFactory();
    // 建立一個與rabbitmq服務器的鏈接
    Connection connection = factory.newConnection();
    // 建立一個Channel
    Channel channel = connection.createChannel();
    // 經過Channel定義隊列
    channel.queueDeclare("qos", false, false, false, null);
    // 異步回調處理
    DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("deliverCallback1 Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag());
    };
    DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("deliverCallback2 Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag());
    };
    channel.basicQos(2, false);
    channel.basicQos(3, true);
    // 接收消息
    channel.basicConsume("qos", false, deliverCallback1, consumerTag -> {
    });
    channel.basicConsume("qos", false, deliverCallback2, consumerTag -> {
    });
}

運行後控制檯以下,打印了三個消息,說明整個信道就只能有三個未確認的消息,第一個消費者有兩個未確認的消息後再也不接收,由第二個消費者接收。
image.png
web控制檯信息,確實只有3個未確認的消息,還有1個待發送。
image.png
注意,若是換了順序呢?
把下面的代碼code

channel.basicQos(2, false);
channel.basicQos(3, true);

換成,先控制信道的未確認的消息是3個,再控制消費者未確認的消息是2個rabbitmq

channel.basicQos(3, true);
channel.basicQos(2, false);

運行後,控制檯以下,每一個消費者都2個未確認的消息。此時信道的限制不生效了。
image.png隊列