RocketMQ.1-快速入門

學習RocketMQ的第一天,應該從官網的QuickStart案例開始,這一節就來介紹一下如何部署單機RocketMQ以及進行消息的收發。java

0. 版本說明

使用RocketMQ須要有以下的硬件要求:git

  • 64位操做系統
  • JDK 1.8+
  • Maven 3.2.x
  • Git
  • 4GB+ 硬盤空間(broker 存儲須要)

瞭解版本說明以後,咱們就能夠開始進行實戰了。github

Ps: RocketMQ版本爲Release.4.7.0apache

1. 獲取源碼

打開RocketMQGithub上的主頁,獲取倉庫地址。而後在本地電腦上克隆本倉庫。服務器

git clone https://github.com/apache/rocketmq.git

2. 啓動服務器

2.1 啓動 nameserver

打開項目後,第一步要作的是啓動nameserver,這是RocketMQ的路由中心,它提供輕量級服務發現和路由,主要的做用是存儲路由信息,管理broker節點,包括路由的查找、註冊和刪除。app

RocketMQ工程的namesrv包中找到入口類org.apache.rocketmq.namesrv.NamesrvStartup,運行這個類的main函數,發現報錯了。ide

Please set the ROCKETMQ_HOME variable in your environment to match the location of the RocketMQ installation

這個報錯是由於在爲nameserver設置相關配置時沒有設置成功。函數

if (null == namesrvConfig.getRocketmqHome()) {
    System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
    System.exit(-2);
}

ROCKETMQ_HOME環境變量主要用於設置nameserver的配置,只須要將包含conf配置目錄的這個路徑賦值給環境變量ROCKETMQ_HOME便可,以下圖。學習

2020060601

再次運行main函數,就會發現啓動成功。ui

The Name Server boot success. serializeType=JSON

2.2 啓動 broker

接下來要啓動的是broker,它主要用於消息存儲,接收和發送。

一樣在RocketMQ工程的broker包中找到入口類org.apache.rocketmq.broker.BrokerStartup,可是與啓動nameserver不一樣的是,啓動broker時須要指定註冊的nameserver地址,在啓動命令中輸入-n 127.0.0.1:9876便可。

2020060602

運行main函數,若是發現與以前同樣的報錯,從新設置該Application環境變量便可,運行成功的輸出以下。

The broker[daxiongMac.local, 192.168.31.126:10911] boot success. serializeType=JSON and name server is namesrvAddr=127.0.0.1:9876

至此,RocketMQ的路由中心和接收發消息的服務器就啓動成功了,咱們能夠經過nameserverbroker來進行消息傳遞了。

3. 啓動 Producer 發送消息

找到example包的org.apache.rocketmq.example.quickstart.Producer類,這是一個最簡單的消息生產者,咱們來看一下它的源碼。

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        for (int i = 0; i < 1000; i++) {
            try {
                Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}
  1. 使用DefaultMQProducer類來建立生產者實例,並指定消息組Group和路由中心地址。
  2. 啓動生產者實例。
  3. 建立消息,指定TopicTag(用於區分消息的類別)。
  4. 發送消息。
  5. 關閉生產者實例。
21:22:35.450 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
SendResult [sendStatus=SEND_OK, msgId=C0A81F7E8D39330BEDB41E560E4E0000, offsetMsgId=C0A81F7E00002A9F0000000000068BA2, messageQueue=MessageQueue [topic=TopicTest, brokerName=daxiongMac.local, queueId=1], queueOffset=500]
SendResult [sendStatus=SEND_OK, msgId=C0A81F7E8D39330BEDB41E560E810001, offsetMsgId=C0A81F7E00002A9F0000000000068C54, messageQueue=MessageQueue [topic=TopicTest, brokerName=daxiongMac.local, queueId=2], queueOffset=500]
SendResult [sendStatus=SEND_OK, msgId=C0A81F7E8D39330BEDB41E560E850002, offsetMsgId=C0A81F7E00002A9F0000000000068D06, messageQueue=MessageQueue [topic=TopicTest, brokerName=daxiongMac.local, queueId=3], queueOffset=500]
......

這樣就實現了RocketMQ的發送消息。

4. 啓動 Consumer 消費消息

找到example包的org.apache.rocketmq.example.quickstart.Consumer類,這是一個最簡單的消息消費者,咱們來看一下它的源碼。

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}
  1. 使用DefaultMQPushConsumer類來建立消費者實例,並指定消息組Group、路由中心地址、消費模式、消息類別。
  2. 註冊消息監聽器,監聽消息,消費消息,返回消費成功標識。
  3. 啓動生產者實例。
21:24:03.482 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_6 Receive New Messages: [MessageExt [brokerName=daxiongMac.local, queueId=2, storeSize=178, queueOffset=502, sysFlag=0, bornTimestamp=1591449756319, bornHost=/192.168.31.126:50803, storeTimestamp=1591449756321, storeHost=/192.168.31.126:10911, msgId=C0A81F7E00002A9F00000000000691E4, commitLogOffset=430564, bodyCRC=1565577195, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1591449844575, UNIQ_KEY=C0A81F7E8D39330BEDB41E560E9F0009, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57], transactionId='null'}]] 
ConsumeMessageThread_5 Receive New Messages: [MessageExt [brokerName=daxiongMac.local, queueId=1, storeSize=178, queueOffset=502, sysFlag=0, bornTimestamp=1591449756316, bornHost=/192.168.31.126:50803, storeTimestamp=1591449756317, storeHost=/192.168.31.126:10911, msgId=C0A81F7E00002A9F0000000000069132, commitLogOffset=430386, bodyCRC=710410109, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1591449844576, UNIQ_KEY=C0A81F7E8D39330BEDB41E560E9C0008, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56], transactionId='null'}]]
......

至此,咱們就完成了RocketMQ的快速入門,啓動nameserverbroker,建立生產者發送消息,建立消費者接收消息。

版權聲明:本文爲 Planeswalker23所創,轉載請帶上原文連接,感謝。