RabbitMQ 發佈訂閱的使用

發佈訂閱的應用場景一般爲一個生產者、一個交換機,多個消費者各自建立隊列綁定到交換機上訂閱消息並消費。php

發佈訂閱一般配合如下設置共同使用:json

  • 消費者建立的隊列隨機命名便可
  • 關閉隊列持久化
  • 關閉消息持久化
  • 消費者建立的隊列在鏈接關閉時自動刪除
  • 開啓自動消息確認

安裝依賴

# composer.json
{
    "require": {
        "php-amqplib/php-amqplib": ">=3.0"
    }
}
> composer.phar install

模式結構

image.png

在 RabbitMQ 消息傳遞模型中,生產者是不會向隊列直接發送消息的,只能將消息發送給交換機。composer

交換機接收來自生產者的消息,而後將它們推送到隊列中。函數

發佈訂閱使用的是fanout 交換機,這個交換機很是簡單,將它收到的全部消息廣播到它綁定的全部隊列。ui

生產者

生產者鏈接到RabbitMQ,發送一條消息,而後退出。spa

# send.php

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立鏈接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 建立通道
$channel = $connection->channel();

// 定義一個名爲 logs 的 fanout 廣播交換機
$channel->exchange_declare('logs', 'fanout', false, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "info: Hello World!";
}

// 
$msg = new AMQPMessage($data);

// 將消息發送到名爲 logs 的 fanout 廣播交換機 (消息內容, 交換機, 路由鍵);
$channel->basic_publish($msg, 'logs');

echo ' [x] Sent ', $data, "\n";

$channel->close();
$connection->close();

消費者

消費者監聽來自RabbitMQ的消息,一般須要一直保持運行狀態以監聽消息。日誌

# receive.php

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 建立鏈接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 建立通道
$channel = $connection->channel();

// 定義一個名爲 logs 的 fanout 廣播交換機
$channel->exchange_declare('logs', 'fanout', false, false, false);

// 建立一個隨機命名的新隊列,第三個參數爲關閉隊列持久化,第四個參數爲當聲明它的鏈接關閉時隊列會被自動刪除
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// 將隨機命名的隊列綁定到 fanout 廣播交換機,生產者向交換機發送消息將被廣播到綁定的隊列中
$channel->queue_bind($queue_name, 'logs');

echo " [*] Waiting for messages. To exit press CTRL+C\n";

// 定義回調函數
$callback = function ($msg) {
    echo ' [x] ', $msg->body, "\n";
};

// 第四個參數設爲true開啓自動消息確認,即投遞消息後馬上標記爲刪除
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();

運行

打開一個終端,運行消費者,將日誌放到文件中:code

php receive.php > logs_from_rabbit.log

打開另外一個終端,運行消費者,將日誌輸出到終端:rabbitmq

php receive.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

打開另外一個終端,運行生產者:隊列

php send.php

查看全部交換機

sudo rabbitmqctl list_exchanges

查看全部的綁定關係

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.