RabbitMQ 工做隊列的使用

工做隊列的應用場景一般爲一個生產者、一個隊列、多個消費者公平消費隊列。php

工做隊列一般配合如下設置共同使用:json

  • 開啓隊列持久化
  • 開啓消息持久化
  • 開啓公平分發
  • 關閉自動消息確認
  • 手動確認消息

安裝依賴

# composer.json
{
    "require": {
        "php-amqplib/php-amqplib": ">=3.0"
    }
}
> composer.phar install

模式結構

image.png

生產者

生產者鏈接到RabbitMQ,發送一條消息,而後退出。composer

# send.php

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立鏈接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 建立通道
$channel = $connection->channel();

// 建立隊列,已存在的不會重複建立,第三個參數爲開啓隊列持久化
$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "Hello World!";
}

// 第二個參數 delivery_mode = AMQPMessage::DELIVERY_MODE_PERSISTENT 爲設置消息持久化
$msg = new AMQPMessage(
    $data,
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

// 經過默認的交換機發送消息到隊列 (消息內容, 默認交換機, 路由鍵);
$channel->basic_publish($msg, '', 'task_queue');

echo ' [x] Sent ', $data, "\n";

$channel->close();
$connection->close();

消費者

消費者監聽來自RabbitMQ的消息,一般須要一直保持運行狀態以監聽消息。函數

# receive.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 建立鏈接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 建立通道
$channel = $connection->channel();

// 建立隊列,已存在的不會重複建立,第三個參數爲開啓隊列持久化
$channel->queue_declare('task_queue', false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

// 定義回調函數
$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done\n";
    // 手動消息確認
    $msg->ack();
};

// 設置 prefetch_count = 1,開啓公平分發(默認爲循環分發)
// 在處理並確認上一條消息以前,不要將新消息發送給消費者,而發送給其餘消費者
$channel->basic_qos(null, 1, null);

// 第四個參數設爲false關閉自動消息確認,爲true打開自動消息確認即投遞消息後馬上標記爲刪除
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();

運行

打開一個終端,運行消費者:fetch

php receive.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'

打開另外一個終端,運行消費者:ui

php receive.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

打開另外一個終端,運行生產者:spa

php send.php First message.
php send.php Second message..
php send.php Third message...
php send.php Fourth message....
php send.php Fifth message.....

注意事項

  1. 關閉自動消息確認必須記得要手動確認,不然會致使消息沒法釋放,內存消耗愈來愈大。
  2. RabbitMQ 不容許使用不一樣的參數從新定義已有隊列,將返回錯誤,如非持久化的隊列設置爲持久化的隊列等。

查看未確認消息

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged