架構設計之NodeJS操做消息隊列RabbitMQ

一. 什麼是消息隊列?

消息(Message)是指在應用間傳送的數據。消息能夠很是簡單,好比只包含文本字符串,也能夠更復雜,可能包含嵌入對象。javascript

消息隊列(Message Queue)是一種應用間的通訊方式,消息發送後能夠當即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而無論是誰發佈的。這樣發佈者和使用者都不用知道對方的存在。java

二. 經常使用的消息隊列有哪些?

RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq。node

甚至如今部分NoSQL也可作消息隊列,如Redis。git

三. 消息隊列的使用場景?

  • 異步處理

  • 應用解耦

  • 流量削峯

四. 使用案例

上規模的公司都會有本身的日誌分析系統,日誌系統是怎麼實現的呢?github


圖解:用戶在訪問應用的時候,咱們要記錄下用戶的操做記錄和系統的異常日誌,常規的作法是將系統產生的日誌保存到服務器磁盤,在服務器中開啓定時任務,定時將磁盤的日誌信息傳入mq中(生產者),也定時將mq中的消息取出並存到相應的數據庫,如ElasticSearch或Hive中。數據庫

五. 如何安裝RabbitMQ?

上面的案例介紹了MQ的一個使用場景,我這裏是用RabbitMQ舉例,現實項目中可能用到的是Kafka。瀏覽器

  1. 首先安裝brew(mac爲例)ruby

    /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"複製代碼
  2. 安裝RabbitMQbash

    brew install rabbitmq複製代碼
  3. 運行RabbitMQ服務器

    進入到 /usr/local/Cellar/rabbitmq/3.7.7,執行

    localhost:3.6.6 lidong$ sbin/rabbitmq-server複製代碼
  4. 啓動插件

    進入到 /usr/local/Cellar/rabbitmq/3.7.7/sbin

    ./rabbitmq-plugins enable rabbitmq_management複製代碼
  5. 登錄管理界面

    打開瀏覽器輸入:http://localhost:15672,RabbitMQ默認15672端口


六. Nodejs操做RabbitMQ

網上能夠找到好幾個相應的Node SDK,這裏推薦amqplib

  1. 生產者

/** * 對RabbitMQ的封裝 */ let amqp = require('amqplib'); class RabbitMQ { constructor() { this.hosts = []; this.index = 0; this.length = this.hosts.length; this.open = amqp.connect(this.hosts[this.index]); } sendQueueMsg(queueName, msg, errCallBack) { let self = this; self.open .then(function (conn) { return conn.createChannel(); }) .then(function (channel) { return channel.assertQueue(queueName).then(function (ok) { return channel.sendToQueue(queueName, new Buffer(msg), { persistent: true }); }) .then(function (data) { if (data) { errCallBack && errCallBack("success"); channel.close(); } }) .catch(function () { setTimeout(() => { if (channel) { channel.close(); } }, 500) }); }) .catch(function () { let num = self.index++; if (num <= self.length - 1) { self.open = amqp.connect(self.hosts[num]); } else { self.index == 0; } }); } } 複製代碼/** * 對RabbitMQ的封裝 */ let amqp = require('amqplib'); class RabbitMQ { constructor() { this.hosts = []; this.index = 0; this.length = this.hosts.length; this.open = amqp.connect(this.hosts[this.index]); } sendQueueMsg(queueName, msg, errCallBack) { let self = this; self.open .then(function (conn) { return conn.createChannel(); }) .then(function (channel) { return channel.assertQueue(queueName).then(function (ok) { return channel.sendToQueue(queueName, new Buffer(msg), { persistent: true }); }) .then(function (data) { if (data) { errCallBack && errCallBack("success"); channel.close(); } }) .catch(function () { setTimeout(() => { if (channel) { channel.close(); } }, 500) }); }) .catch(function () { let num = self.index++; if (num <= self.length - 1) { self.open = amqp.connect(self.hosts[num]); } else { self.index == 0; } }); } } 複製代碼

2. 消費者

/** * 對RabbitMQ的封裝 */ let amqp = require('amqplib'); class RabbitMQ { constructor() { this.open = amqp.connect(this.hosts[this.index]); } receiveQueueMsg(queueName, receiveCallBack, errCallBack) { let self = this; self.open .then(function (conn) { return conn.createChannel(); }) .then(function (channel) { return channel.assertQueue(queueName) .then(function (ok) { return channel.consume(queueName, function (msg) { if (msg !== null) { let data = msg.content.toString(); channel.ack(msg); receiveCallBack && receiveCallBack(data); } }) .finally(function () { setTimeout(() => { if (channel) { channel.close(); } }, 500) }); }) }) .catch(function () { let num = self.index++; if (num <= self.length - 1) { self.open = amqp.connect(self.hosts[num]); } else { self.index = 0; self.open = amqp.connect(self.hosts[0]); } }); }複製代碼/** * 對RabbitMQ的封裝 */ let amqp = require('amqplib'); class RabbitMQ { constructor() { this.open = amqp.connect(this.hosts[this.index]); } receiveQueueMsg(queueName, receiveCallBack, errCallBack) { let self = this; self.open .then(function (conn) { return conn.createChannel(); }) .then(function (channel) { return channel.assertQueue(queueName) .then(function (ok) { return channel.consume(queueName, function (msg) { if (msg !== null) { let data = msg.content.toString(); channel.ack(msg); receiveCallBack && receiveCallBack(data); } }) .finally(function () { setTimeout(() => { if (channel) { channel.close(); } }, 500) }); }) }) .catch(function () { let num = self.index++; if (num <= self.length - 1) { self.open = amqp.connect(self.hosts[num]); } else { self.index = 0; self.open = amqp.connect(self.hosts[0]); } }); }複製代碼

3. 經過生產者向MQ發送一個消息,並建立隊列

let mq = new RabbitMQ();
mq.sendQueueMsg('testQueue', 'my first message', (error) => {
    console.log(error)
})複製代碼

執行以後,咱們打開管理平臺,發現RabbbitMQ已經接受到了一條消息:

而且RabbbitMQ新增了一個隊列testQueue


4. 獲取指定隊列的消息

let mq = new RabbitMQ();
mq.receiveQueueMsg('testQueue',(msg) => 
{    
   console.log(msg)
})
// 輸出結果:my first message複製代碼

此時打開RabbitMQ管理平臺,消息數量已經變爲0:


綜上:咱們簡單講述了消息隊列及RabbitMQ相關的一些知識,以及咱們如何經過nodejs來生產與消費消息,上面講的比較簡單,以後會發表更多文章講述消息隊列集羣搭建及容災的實現。