消息隊列的使用<二>:ActiveMQ的基本使用(Java)

[toc]html

首發時間:2019-05-16java


ActiveMQ

<br>linux

介紹

  • ActiveMQ是Apache旗下的一款開源的消息隊列(消息中間件MOM,Message Oriented Middleware)
  • 它徹底支持JMS,支持JMS1.1和J2EE 1.4 規範。
  • 支持多種網絡協議。
  • 兼容多種語言(C,C++,Java,Python,PHP)
  • 能夠方便地與spring進行整合開發。
  • (其餘的一些說的太多可能也不是很懂,之後想了解再本身瞭解吧,畢竟這裏只是個小博文。)

<br> <br>spring

下載、安裝和初次運行

1.下載:從ActiveMQ官網下載ActiveMQ,地址:http://activemq.apache.org/download.html 2.安裝:下載下來的是一個壓縮包,解壓即安裝,直接解壓到一個目錄便可; 3.初次運行:(在啓動ActiveMQ前,請先要已經安裝和配置好JDK)在windows版本的activemq中在activeMQ/bin下面有兩個目錄,爲win32,win64,根據本身的系統位數進入不一樣的目錄,而後直接雙擊目錄下的activemq.bat (在linux中爲: ./activemq start) 4.檢測是否啓動:ActiveMQ默認使用61616端口提供JMS服務,使用8161端口提供管理控制檯服務,咱們能夠直接訪問activemq的管理控制檯網頁來肯定是否已經開始服務:localhost:8161/admin,默認的用戶名和密碼都是admin,輸入後將進入以下的界面: 5.關閉ActiveMQ:windosw中直接ctrl+c關閉cmd窗口(在linux中: ./activemq stop)apache

<br>編程

Java上初次使用activeMQ

這裏以PTP模型的生產者和消費者的消息傳遞爲例。windows

<br>服務器

1.首先導入依賴包,以maven爲例:網絡

<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.9</version>
        </dependency>

2.生產者發送消息(以PTP方式爲例):【這裏是符合上面的「JMS應用開發基本步驟」的】session

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Producer {

    public static void main(String[] args) throws JMSException {
        //1.建立connectionfacoty,參數是activemq的服務地址,前綴tcp表明是tcp鏈接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        //2.使用ConnectionFactory建立connnect,並啓動connnect
        Connection connection = connectionFactory.createConnection();
        connection.start();
        //3.使用Connection建立session,第一個參數是是否使用事務,第二個參數是確認機制
        Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
        //4.建立目的地(這裏以PTP爲例,因此目的地是一個Queue),參數是Queue的名字
        Destination destination = session.createQueue("tempqueue");
        //5.建立生產者,第一個參數是目的地,此時建立的生產者要與目的地進行綁定。
        MessageProducer producer = session.createProducer(destination);
        //6.使用session建立消息,這裏使用TEXT類型的消息
        TextMessage textMessage = session.createTextMessage("hello world!");
        //7.生產者發送消息
        producer.send(textMessage);
        //8.提交事務
        session.commit();
        //9.關閉資源
        session.close();
        connection.close();
    }
}

3.消費者接收消息(以PTP方式爲例):

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Consumer {
    public static void main(String[] args) throws JMSException {
        //1.建立connectionfacoty
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        //2.建立connnect,並啓動connnect
        Connection connection = connectionFactory.createConnection();
        connection.start();
        //3.建立session,第一個參數是是否使用事務,第二個參數是確認機制
        Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
        //4.建立目的地【消費者與生產者的目的地相同才能進行消息傳遞】
        Destination destination = session.createQueue("tempqueue");
        //5.建立消費者,第一個參數是目的地,此時建立的消費者要與目的地進行綁定。
        MessageConsumer consumer = session.createConsumer(destination);
        //6.使用消費者接受消息消息
        TextMessage message = (TextMessage) consumer.receive();
        System.out.println(message.getText());
        //8.提交事務
        session.commit();
        //9.關閉資源
        session.close();
        connection.close();
    }
}

上述代碼解析: 1.前半部分代碼都是同樣的,都是建立ConnectionFactory,Connection,Session 2.而後建立目的地Destination,這個目的地就是要把消息存儲到哪裏和從哪裏取消息。 3.若是是生產者,那麼由Session來建立生產者,建立的時候傳入一個目的地,來與生產者綁定,生產者調用send發送的消息都會存儲到目的地中。生產者發送的消息須要使用Session來建立,調用createXXXMessage來建立消息,建立什麼類型的消息取決於使用什麼方法來建立。 4.若是是消費者,那麼由Session來建立消費者,建立的時候傳入一個消費者,來與消費者綁定,消費者調用receive時會從目的地中獲取消息。獲取到的結果是一個XXXMessage,一般須要轉成對應類型的Message,而後再調用對應的獲取消息體的方法來獲取消息體。例如TextMessage類型的消息要獲取消息體須要調用getText()。 5.若是使用了事務,那麼須要session.commit() 6.最後關閉全部資源

<br> <br>

設置請求屬性:

設置標準屬性:使用消息調用setJMS開頭的方法。【要注意的是爲了不發生過時的消息,任何直接經過編程方式來調用setJMSExpiration()方法都會被忽略。 】 設置自定義屬性:使用消息調用setXXXProperty的方法。 接受屬性:

<br> <br>

可靠性機制

在上面的概念學習中你應該瞭解到,若是不使用事務來進行消息肯定,那麼須要手動使用消息來調用acknowledge來確認消息。【並且這時候是在會話層進行確認,因此在這個會話中只要一條消息進行了確認,其餘消息也會被確認(即便他收了兩條消息只確認了一條)】 當使用了事務的時候,代碼中就不要使用acknowledge了,會影響消息的確認。

<br> <br>

事務

在上面的概念學習有提到了事務,事務可使一系列操做共同成功或失敗。下面來演示一下事務的使用。 1.首先,在建立Session的時候第一個參數是是否使用事務,要使用事務須要賦值TRUE。 2.提交事務使用session.commit(),回滾是session.rollback() 3.對於生產者,事務是確保消息發送的一致性;對於消費者,事務是確保消息消費的一致性, 4.對於事務的測試可使用單步運行來測試,在發消息處打斷點,測試未commit時消費者是否能取到消息。

<br> <br>

消息消費方式

在上面的概念學習中有談到消費者消費消息的兩種方式,一種是堵塞的receive,一種是監聽器。監聽器與receive最大的區別是若是沒有,那麼監聽器不會等,而receive會等。因此監聽器適用於不但願堵塞程度運行的場景。

receive

上面的代碼都是使用堵塞的receive來接收的,你應該能夠留意到當運行了消費者後,沒有取到消息的時候會一直堵塞在那裏。receive也能夠設置阻塞時間,時間到了就再也不等了。

監聽器:

public class Consumer {
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("tempqueue2");
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage msg = (TextMessage) message;
                try {
                    System.out.println("..."+msg.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        session.commit();
        session.close();
        connection.close();
    }
}

<br> <br>

消息類型

在上面有介紹到消息體有不一樣的類型,TextMessage,MapMessage等。 不一樣的消息相似影響存儲到消息隊列的消息的格式,也影響取消息的方式,取消息的方式要與消息類型對應。

TextMessage:數據類型相似於字符串。使用getText()來獲取數據。 <br>

MapMessage:數據類型相似於Map。使用getXXX(key)來獲取數據。 <br>

<br> <br>

發佈/訂閱模式

上面主要講的都是PTP模式,下面來說一下發布/訂閱模式。在上面的概念學習中,有涉及到多消費者廣播、持久化訂閱,下面將演示這些概念的實際使用。

非持久訂閱

先演示非持久訂閱的,因爲非持久訂閱只能發送給在線的消費者,因此先運行消費者(多個)。【非持久訂閱的消息接收與PTP同樣可使用receive】 而後建立生產者發生消息: 注意:在非持久化訂閱中,一般要使消費者持續receive,因此一般使用while循環來接受消息。

Message message = consumer.receive();
while(message!=null){
   TextMessage txtMsg = (TextMessage)message;
   sysout(textmsg.getText());
   message = consumer.receive();
}

持久化訂閱

要進行持久化訂閱,首先要將生產者的發送模式改爲持久化模式,這個設置要在connection.start()以前 而後消費者要建立持久訂閱器,並且要在消息發送以前先運行一次把持久化訂閱器註冊到消息隊列上。 【注意:須要在鏈接上設置消費者ID,用來識別消費者,持久化訂閱器識別消費者依靠消費者ID,若是不設置,那麼下一次「上線」的時候,因爲消費者ID會變化,致使訂閱器沒法與消費者進行關聯】

public class Consumer {
    public static void main(String[] args) throws JMSException {
        //1.建立connectionfacoty
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        //2.建立connnect,並啓動connnect
        Connection connection = connectionFactory.createConnection();
        connection.setClientID("001");

        //3.建立session,第一個參數是是否使用事務,第二個參數是確認機制
        Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createTopic("tempTopic");
        TopicSubscriber s1 = session.createDurableSubscriber((Topic) destination, "s1");
        connection.start();
        Message msg = s1.receive();
        while (msg!=null){
            TextMessage txtmsg = (TextMessage) msg;
            System.out.println(txtmsg.getText());
            session.commit(); // 因爲此時使用了while,因此要在裏面commit
            msg= s1.receive();
        }
        session.close();
        connection.close();
    }
}

<br> <br>

Broker

Broker是ActiveMQ服務器實例。 在使用獨立的ActiveMQ程序的時候,有時候會建立不一樣需求的服務器實例,一般來講都是使用某個配置文件進行建立。 而也是能夠經過代碼內建一個Broker的,內建的Broker比較小巧,適用於一些但願把Broker整合到項目中的場景。

1.經過BrokerService建立:

上述代碼報錯java.lang.NoClassDefFoundError:com/fasterxml/jackson/databind/ObjectMapper時可能缺少如下依賴包:

<dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.5</version>
        </dependency>

2.經過BrokerFactory建立:

使用BrokerFactory建立時須要一個配置文件,這個配置文件的路徑要被傳入到BrokerFactory中,在上面的代碼中就是properties:broker.properties,配置文件裏面是這個Broker的參數。 下面是broker.properties的一個例子:

useJms=true
persistent=false
brokerName=FactoryBroker

對於能配置什麼,能夠參考使用BrokerService建立Broker時的setXXX。

<br> <br>

整合spring開發

如今不少的項目都使用到了Spring,因此這裏也講一下與Spring的整合。 首先理一下,ActiveMQ有什麼能夠交給Spring來管理的?能夠說能夠交給Spring管理的只有Destination,ConnectionFactory,Connection和Broker,只有這幾個實例的複用性比較強,這幾個的管理會在JmsTemplate使用中展現。 <br>

若是你學過Hibernate之類的框架,你應該知道Spring對Hibernate提供HibernateTemplate來整合,HibernateTemplate封裝了Hibernate的一些方法,簡化了使用,在使用HibernateTemplate的時候,dao層的類須要繼承HibernateDaoSupport,而後同時須要在類中注入Connection,這樣HibernateTemplate才能夠正常工做。JmsTemplate相似於HibernateTemplate,不過它是面向JMS的。

使用JmsTemplate

1.首先要編寫Spring配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    <!--管理destination-->
    <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg name="name" value="spring-queue"/> <!-- 目的地名稱 -->
    </bean>
    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg name="name" value="spring-topic"/> <!-- 目的地名稱 -->
    </bean>
    
    <!--管理connectionFactory-->
    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL">
                    <value>tcp://localhost:61616</value>
                </property>
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>

    <!--管理JmsTemplate-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsFactory"></property>
        <property name="defaultDestination" ref="destinationQueue"></property>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"></bean>
        </property>
    </bean>

</beans>

2.而後建立測試類:

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import javax.annotation.Resource;
import javax.jms.*;

@RunWith(SpringJUnit4ClassRunner.class) //使用junit4進行測試
@ContextConfiguration(locations={"classpath:applicationContext.xml"}) //加載配置文件
public class JmsTemplateDemo {
    @Autowired
    private JmsTemplate jmsTemplate;
    @Resource(name="destinationQueue")
    private Destination destinationQueue;
    //發送
    @Test
    public  void send() throws JMSException {
        String msg = "hello world!";
        jmsTemplate.send(destinationQueue, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });

    }
    //接收
    @Test
    public  void recv() throws JMSException {
        TextMessage message = (TextMessage) jmsTemplate.receive(destinationQueue);
        System.out.println(message.getText());
    }
}

在上面的代碼中可使用jmsTemplate.send來發送消息,使用jmsTemplate.receive來接收消息。對於消息確認和事務管理則不須要關心,JmsTemplate會本身處理的。

監聽器

在上面的JmsTemplate接收消息中使用了receive來接收消息,Spring還支持使用監聽器來接收消息,配置監聽器,來達到一有消息就執行某些操做,這樣就省去了消費者的代碼。 1.首先,須要須要在spring配置文件中配置DefaultMessageListenerContainer【還有其餘幾種監聽器】:

<!--配置DefaultMessageListenerContainer -->
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="jmsFactory"></property>
        <!--監聽器 -->
        <property name="messageListener" ref="myMessageListener"></property>
        <!--監聽哪一個目的地 -->
        <property name="destination" ref="destinationQueue"></property>
     </bean>

    <!--配置監聽器-->
    <bean id="myMessageListener" class="withspring.MyMessageListener">
    </bean>

2.而後,定義一個監聽器:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MyMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                TextMessage txtMsg = (TextMessage) message;
                String msg = txtMsg.getText();
                System.out.println("recv:"+ msg);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        } else {
            throw new IllegalArgumentException("消息類型錯誤!");
        }
    }
}

3.最後,運行隨便在這個項目中的一個發送消息的測試方法。

<br> <br>

使用spring集合Broker

1.經過BrokerService:

<?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
           
        <bean id="broker" class="org.apache.activemq.broker.BrokerService" init-method="start" destroy-method="stop">
            <property name="brokerName" value="SpringBroker"/>
            <property name="persistent" value="false"/>
            <property name="transportConnectorURIs">
                <list>
                    <value>tcp://localhost:61616</value>
                </list>
            </property>
        </bean>
    </beans>

【還能夠經過BrokerFactory或BrokerFactoryBean來建立,這裏省略】 【固然,上面的是比較簡單的,沒有進行權限管理,你也登陸不了管理頁,想要肯定是否建立成功能夠監聽接口也能夠進行生產和消費消息】

<br> <br>


後續可擴展內容

<br> 這裏只是一篇小博客,寫不了太多東西。若是想要了解更精細,能夠去買書來看。 下面寫一下後續可擴展學習的內容,學不學由我的考慮。 <br> * 傳輸協議【上面介紹了tcp://localhost:61616,其實還能夠容許非TCP的鏈接】 * 消息存儲持久化【消息是怎麼進行存儲的】 * KahaDB * AMQ * JDBC * 內存存儲 * 部署與集羣 * 優化

<br>

相關文章
相關標籤/搜索