RabbitMQ 簡單隊列的使用

簡單隊列一般爲一個生產者、一個消費者、一個隊列的結構。php

安裝依賴

# composer.json
{
    "require": {
        "php-amqplib/php-amqplib": ">=3.0"
    }
}
> composer.phar install

模式結構

image.png

生產者

生產者鏈接到RabbitMQ,發送一條消息,而後退出。json

# send.php

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立鏈接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 建立通道
$channel = $connection->channel();

// 建立隊列,已存在的隊列不會重複建立
$channel->queue_declare('hello', false, false, false, false);

$msg = new AMQPMessage('Hello World!');
// 經過默認的交換機發送消息到隊列 (消息內容, 默認交換機, 路由鍵)
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'\n";

$channel->close();
$connection->close();

消費者

消費者監聽來自 RabbitMQ 的消息,一般須要一直保持運行狀態以監聽消息。composer

# receive.php

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 建立鏈接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 建立通道
$channel = $connection->channel();

// 建立隊列,已存在的不會重複建立
$channel->queue_declare('hello', false, false, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

// 定義消息處理回調函數
$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();

運行

打開一個終端,運行消費者:函數

php receive.php

打開另外一個終端,運行生產者:工具

php send.php

使用 rabbitmqctl 工具查看隊列

# Linux
sudo rabbitmqctl list_queues
# Windows
rabbitmqctl.bat list_queues