從零開始的高併發(四)--- Zookeeper的分佈式隊列

前言

前情概要

上一篇中咱們提到了zookeeper僞集羣的簡單搭建,爲了提供可靠的zookeeper服務,咱們須要集羣的支持。html

集羣搭建中該注意的點有兩個,一個是zoo.cfg文件的參數配置,咱們往其加入了dataLogDir路徑來存放事務日誌,還有要給三個集羣的zoo.cfg文件都添加上集羣節點配置,二是myid文件,myid是一行只包含機器id的文本,id在集羣中必須是惟一的,其值應該在1~255之間,注意目錄不能放錯(dataDir的路徑下)且注意編寫時別輸入錯誤的字符便可。java

咱們還簡單地提到了paxos算法,根據一個小的場景描述了其流程,而且解釋了zookeeper中選舉算法的步驟,並結合打印出來的日誌信息分析了其步驟node

以往連接

從零開始的高併發(一)--- Zookeeper的基礎概念算法

從零開始的高併發(二)--- Zookeeper實現分佈式鎖數據庫

從零開始的高併發(三)--- Zookeeper集羣的搭建和leader選舉apache

內容一:(補充)zookeeper集羣的工做原理

zookeeper提供了重要的分佈式協調服務,它是如何保證集羣數據的一致性的?安全

① ZAB協議的簡單描述

ZAB(zookeeper atomic broadcast)---zookeeper 原子消息廣播協議是專門爲zookeeper設計的數據一致性協議,注意此協議最主要的關注點在於數據一致性,而無關乎於數據的準確性,權威性,實時性。服務器

ZAB協議過程網絡

1.全部事務轉發給leader(當咱們的follower接收到事務請求)
2.Leader分配全局單調遞增事務id(zxid,也就是相似於paxos算法的編號n),廣播協議提議
3.Follower處理提議,做出反饋(也就是承諾只接受比如今的n編號大的
4.leader收到過半數的反饋,廣播commit,把數據完全持久化(和2pc不一樣的是,2pc是要等待全部小弟反饋贊成)
5.leader對原來轉發事務的followe進行響應,follower也順帶把響應返回給客戶端
複製代碼

還記得咱們說過zookeeper比較適合讀比較多,寫比較少的場景嗎,爲何咱們說它效率高,咱們能夠知道,全部的事務請求,必須由一個全局惟一的服務器進行協調,這個服務器也就是如今的leader,leader服務器把客戶端的一個寫請求事務變成一個提議,這個提議經過咱們的原子廣播協議廣播到咱們服務器的其餘節點上去,此時這個協議的編號,也就是zxid確定是最大的。多線程

因爲咱們的zxid都是由leader管理的,在上一節也是講過,leader之因此能成爲leader,原本就是由於它的zxid最大,此時的事務請求過來,leader的zxid自己最大的基礎上再遞增,這樣新過來的事務的zxid確定就是最大的。那麼一連串的事務又是如何在leader中進行處理,leader中會內置一個隊列,隊列的做用就是用來保證有序性(zxid有序且隊列先進先出原則),因此後面來的事務不可能跳過前面來的事務。因此這也是ZAB協議的一個重要特性---有序性

② Leader崩潰時的舉措

leader服務器崩潰,或者說因爲網絡緣由致使leader失去了與過半follower的聯繫,那麼就會進入崩潰恢復模式

咱們回到上一節配置集羣節點配置時,提到了在配置各節點時

server.id = host:port:port
        id:經過在各自的dataDir目錄下建立一個myId的文件來爲每臺機器賦予一個服務器id,這個id咱們通常用基數數字表示
        兩個port:第一個follower用來鏈接到leader,第二個用來選舉leader
複製代碼

此時第二個port,就是崩潰恢復模式要使用到的

1.ZAB協議規定若是一個事務proposal(提案)在一臺機器上被處理成功,那麼應該在全部的機器上都被處理成功,
哪怕這臺機器已經崩潰或者故障
2.ZAB協議確保那些已經在leader服務器上提交的事務最終被全部服務器都提交
3.ZAB協議確保丟棄那些只在leader服務器上被提出的事務
複製代碼

因此此時咱們ZAB協議的選舉算法應該知足:確保提交已經被leader提交的事務proposal,同時丟棄已經被跳過的事務proposal

若是讓leader選舉算法可以保證新選舉出來的leader擁有集羣中全部機器的最高zxid的事務proposal,那麼就能夠保證這個新選舉出來的leader必定具備全部已經提交的提案,同時若是讓擁有最高編號的事務proposal的機器來成爲leader,就能夠省去leader檢查事務proposal的提交和丟棄事務proposal的操做。

③ ZAB協議的數據同步

leader選舉完成後,須要進行follower和leader的數據同步,當半數的follower完成同步,則能夠開始提供服務。

數據同步過程

leader服務器會爲每個follower服務器都準備一個隊列,並將那些沒有被各follower服務器同步的事務
以proposal的形式逐個發送給follower服務器,並在每個proposal消息後面接着發送一個commit消息,
表示該事務已經進行提交,直到follower服務器將全部還沒有同步的事務proposal都從leader上同步
併成功提交到本地數據庫中,leader就會將該follower加入到可用follower中
複製代碼

④ ZAB協議中丟棄事務proposal

zxid=高32位+低32位=leader週期編號+事務proposal編號
複製代碼

事務編號zxid是一個64位的數字,低32位是一個簡單的單調遞增的計數器,針對客戶端的每個事務請求,leader產生新的事務proposal的時候都會對該計數器進行+1的操做,高32位表明了leader週期紀元的編號。

每當選舉產生一個新的leader,都會從這個leader服務器上取出其本地日誌中最大事務proposal的zxid,並從zxid解析出對應的紀元值,而後對其進行+1操做,以後以此編號做爲新的紀元,並將低32位重置爲0開始生產新的zxid。

基於此策略,當一個包含了上一個leader週期中還沒有提交過的事務proposal的服務器啓動加入到集羣中,發現此時集羣中已經存在leader,將自身以follower角色鏈接上leader服務器後,leader服務器會根據自身最後被提交的proposal和這個follower的proposal進行比對,發現這個follower中有上一個leader週期的事務proposal後,leader會要求follower進行一個回退操做,回到一個確實被集羣過半機器提交的最新的事務proposal

⑤ zookeeper的可配置參數

能夠從官網上了解zookeeper的可配置參數

zookeeper.apache.org/doc/current…

雖然是全英,可是當你們有須要使用到它們的時候,那英文就天然不成問題了是吧

內容二:zookeeper的典型應用場景

數據發佈訂閱
命名服務
master選舉
集羣管理
分佈式隊列
分佈式鎖
複製代碼

1.分佈式隊列的應用場景

① 業務解耦

實現應用之間的解耦,這時全部的下游系統都訂閱隊列,從而得到一份實時完整的數據

解耦的應用很是普遍,好比咱們常見的發貨系統和訂單系統,之前業務串行的時候,發貨系統必定要等訂單系統生成完對應的訂單纔會進行發貨。這樣若是訂單系統崩潰,那發貨系統也沒法正常運做,引入消息隊列後,發貨系統是正常處理掉髮貨的請求,再把已發貨的消息存入消息隊列,等待訂單系統去更新並生成訂單,可是此時,訂單系統就算崩潰掉,咱們也不會一直不發貨。

② 異步處理

能夠看到在此場景中隊列被用於實現服務的異步處理,這樣作的好處在於咱們能夠更快地返回結果和減小等待,實現步驟之間的併發,提高了系統的整體性能等

③ 流量削峯

2.zk的分佈式隊列

① 邏輯分析

順序節點的應用,相似於咱們在用zookeeper實現分佈式鎖的時候如何去處理驚羣效應的作法。 且根據隊列的特色:FIFO(先進先出),入隊時咱們建立順序節點(ps:爲何上面咱們是用了順序節點而不是說是臨時順序節點,是由於咱們根本不考慮客戶端掛掉的狀況)並把元素傳入隊列,出隊時咱們取出最小的節點。使用watch機制來監聽隊列的狀態,在隊列滿時進行阻塞,在隊列空時進行寫入便可。

入隊操做

如上圖,咱們生產者須要對資源進行訪問時,會申請獲取一個分佈式鎖,若是未成功搶佔鎖,就會進行阻塞,搶到鎖的生產者會嘗試把任務提交到消息隊列,此時又會進行判斷,若是隊列滿了,就監聽隊列中的消費事件,當有消費隊列存在空位時進行入隊,未消費時阻塞。入隊時它會進行釋放鎖的操做,喚醒以前搶佔鎖的請求,並讓以後的生產者來獲取。

出隊操做

出隊和入隊的機制是十分類似的。

② JDK阻塞隊列操做

阻塞隊列:BlockingQueue---線程安全的阻塞隊列

它以4種形式出現,對於不能當即知足可是在未來某一時刻可能知足的操做,4種形式的處理方式皆不一樣

1.拋出一個異常
2.返回一個特殊值,true or false
3.在操做能夠成功前,無限阻塞當前線程
4.放棄前只在給定的最大時間限制內阻塞
複製代碼

咱們將會實現這個阻塞隊列接口來實現咱們的分佈式隊列

內容三:分佈式隊列的代碼實現

public class ZkDistributeQueue extends AbstractQueue<String> implements BlockingQueue<String> , java.io.Serializable
複製代碼

繼承了AbstractQueue,能夠省略部分基礎實現

① 基本的配置信息及使用到的參數

/**
 * zookeeper客戶端操做實例
 */
private ZkClient zkClient;

/**
 *  定義在zk上的znode,做爲分佈式隊列的根目錄。
 */
private String queueRootNode;
private static final String default_queueRootNode = "/distributeQueue";

/**隊列寫鎖節點*/
private String queueWriteLockNode;
/**隊列讀鎖節點*/
private String queueReadLockNode;
/**
 * 子目錄存放隊列下的元素,用順序節點做爲子節點。
 */
private String queueElementNode;

/**
 * ZK服務的鏈接字符串,hostname:port形式的字符串
 */
private String zkConnUrl;

private static final String default_zkConnUrl = "localhost:2181";

/** 
 * 隊列容量大小,默認Integer.MAX_VALUE,無界隊列。
 * 注意Integer.MAX_VALUE其實也是有界的,存在默認最大值
 **/
private static final int default_capacity = Integer.MAX_VALUE;
private int capacity;

/**
 * 控制進程訪問的分佈式鎖
 */
final Lock distributeWriteLock;
final Lock distributeReadLock;
複製代碼

首先咱們須要一個zkClient的客戶端,而後queueRootNode是分佈式隊列的存放元素的位置,指定了一個默認的根目錄default_queueRootNode,把隊列中的元素存放於/distributeQueue下,寫鎖節點表明往隊列中存放元素,讀鎖節點表明從隊列中去取元素,這個設計簡單點來講就是,queueRootNode做爲最大的目錄,其下有3個子目錄,分別是queueWriteLockNode,queueReadLockNode和queueElementNode,其餘的就是一些須要使用到的配置信息

② 構造器

提供兩個構造方法,一個爲使用默認參數實現,另一個是自定義實現

public ZkDistributeQueue() {
		this(default_zkConnUrl, default_queueRootNode, default_capacity);
	}
	
public ZkDistributeQueue(String zkServerUrl, String rootNodeName, int initCapacity) {
	if (zkServerUrl == null) throw new IllegalArgumentException("zkServerUrl");
	if (rootNodeName == null) throw new IllegalArgumentException("rootNodeName");
	if (initCapacity <= 0) throw new IllegalArgumentException("initCapacity");
	this.zkConnUrl = zkServerUrl;
	this.queueRootNode = rootNodeName;
	this.capacity = initCapacity;
	init();
	distributeWriteLock = new ZkDistributeImproveLock(queueWriteLockNode);
	distributeReadLock = new ZkDistributeImproveLock(queueReadLockNode);
}
複製代碼

此時在咱們分佈式鎖的構造器中,createPersistent()的參數true是指若是我父目錄queueRootNode並無事先建立完成,這個方法會自動建立出父目錄,這樣就不怕咱們在跑程序以前遺漏掉一些建立文件結構的工做

public ZkDistributeImproveLock(String lockPath) {
    if(lockPath == null || lockPath.trim().equals("")) {
        throw new IllegalArgumentException("patch不能爲空字符串");
    }
    this.lockPath = lockPath;
    client = new ZkClient("localhost:2181");
    client.setZkSerializer(new MyZkSerializer());
    if (!this.client.exists(lockPath)) {
        try {
            this.client.createPersistent(lockPath, true);
        } catch (ZkNodeExistsException e) {

        }
    }
}
複製代碼

③ 初始化隊列信息的init()方法

從新定義好讀鎖寫寫鎖和任務存放路徑,而後把zkClient鏈接上,建立queueElementNode做爲任務元素目錄,參數true上文做用已經提到了

/**
 * 初始化隊列信息
 */
private void init() {
	queueWriteLockNode = queueRootNode+"/writeLock";
	queueReadLockNode = queueRootNode+"/readLock";
	queueElementNode = queueRootNode+"/element";
	zkClient = new ZkClient(zkConnUrl);
	zkClient.setZkSerializer(new MyZkSerializer());
	if (!this.zkClient.exists(queueElementNode)) {
		try {
			this.zkClient.createPersistent(queueElementNode, true);
		} catch (ZkNodeExistsException e) {
			
		}
	}
}
複製代碼

④ 使用put()方法進行隊列元素入隊操做

// 阻塞操做
@Override
public void put(String e) throws InterruptedException {
	checkElement(e);
	
	//嘗試去獲取分佈式鎖
	distributeWriteLock.lock();
	try {
		if(size() < capacity) {	// 容量足夠
			enqueue(e);
			System.out.println(Thread.currentThread().getName() + "-----往隊列放入了元素");
		}else { // 容量不夠,阻塞,監聽元素出隊
			waitForRemove();
			put(e);
		}
	} finally {
	
        //釋放鎖
		distributeWriteLock.unlock();
	}
}
複製代碼

checkElement()方法是一個簡單的參數檢查,咱們也能夠定義有關於znode的命名規範的一些檢查,不過通常狀況下只要是String類型的參數都是沒有問題的

private static void checkElement(String v) {
    if (v == null) throw new NullPointerException();
    if("".equals(v.trim())) {
    	throw new IllegalArgumentException("不能使用空格");
    }
    if(v.startsWith(" ") || v.endsWith(" ")) {
    	throw new IllegalArgumentException("先後不能包含空格");
    }
}
複製代碼

size()方法也很簡單,就是先取得父目錄而後調用zkClient自帶的countChildren()方法得出結果返回便可

public int size() {
	int size = zkClient.countChildren(queueElementNode);
	return size;
}
複製代碼

從零開始的高併發(二)--- Zookeeper實現分佈式鎖中已經對等待移除的這個方法進行解釋,主要就是經過subscribeChildChanges()監聽子節點的數據變化,在size() < capacity條件成立時,就會喚醒等待隊列,而當size() >= capacity,就會判斷隊列已經被填滿,從而進行阻塞,

/**
 * 隊列容量滿了,不能再插入元素,阻塞等待隊列移除元素。
 */
private void waitForRemove() {
	CountDownLatch cdl = new CountDownLatch(1);
	// 註冊watcher
	IZkChildListener listener = new IZkChildListener() {
		@Override
		public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
			if(currentChilds.size() < capacity) {	// 有任務移除,激活等待的添加操做
				cdl.countDown();
				System.out.println(Thread.currentThread().getName() + "-----監聽到隊列有元素移除,喚醒阻塞生產者線程");
			}
		}
	};
	zkClient.subscribeChildChanges(queueElementNode, listener);
	
	try {
		// 確保隊列是滿的
		if(size() >= capacity) {
			System.out.println(Thread.currentThread().getName() + "-----隊列已滿,阻塞等待隊列元素釋放");
			cdl.await();	// 阻塞等待元素被移除
		}
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
	zkClient.unsubscribeChildChanges(queueElementNode, listener);
}
複製代碼

在waitForRemove()方法執行後,咱們的等待線程被喚醒,這時從新執行put(e),嘗試從新入隊

入隊操做由enqueue(e)來完成,就是建立順序節點的步驟

/**
 * 往zk中添加元素
 * @param e
 */
private void enqueue(String e) {
	zkClient.createPersistentSequential(queueElementNode+"/", e);
}
複製代碼

⑤ 消費操做take

@Override
public String take() throws InterruptedException {

    //老套路,先獲取鎖
	distributeReadLock.lock();
	try {
		List<String> children = zkClient.getChildren(queueElementNode);
		if(children != null && !children.isEmpty()) {
		
		    //先對children進行一個排序,而後取出第一個,也就是最小編號的節點
			children = children.stream().sorted().collect(Collectors.toList());
			String takeChild = children.get(0);
			String childNode = queueElementNode+"/"+takeChild;
			String elementData = zkClient.readData(childNode);
			
			//進行出隊操做
			dequeue(childNode);
			System.out.println(Thread.currentThread().getName() + "-----移除隊列元素");
			return elementData;
		}else {
		
		    //若是children原本就是空的,那就是沒有元素須要消費,那就繼續等待
			waitForAdd();		// 阻塞等待隊列有元素加入
			return take();
		} 
	} finally {
		distributeReadLock.unlock();
	}
}

//出隊操做

private boolean dequeue(String e) {
	boolean result = zkClient.delete(e);
	return result;
}
複製代碼

附:生產者和消費者的模擬

① 生產者

模擬了兩臺服務器,兩個併發,每睡3秒鐘就往消息隊列put

public class DistributeQueueProducerTest {
	public static final String queueRootNode = "/distributeQueue";
	
	public static final String zkConnUrl = "localhost:2181";
	
	public static final int capacity = 20;
	
	public static void main(String[] args) {
		startProducer();
	}
	
	public static void startProducer() {
		// 服務集羣數
		int service = 2;
		// 併發數
		int requestSize = 2;
		
		CyclicBarrier requestBarrier = new CyclicBarrier(requestSize * service);
		// 多線程模擬分佈式環境下生產者
		for (int i = 0; i < service; i++) {
			new Thread(new Runnable() {
				public void run() {
					// 模擬分佈式集羣的場景
					BlockingQueue<String> queue = new ZkDistributeQueue(zkConnUrl, queueRootNode, capacity);
					
					System.out.println(Thread.currentThread().getName() + "---------生產者服務器,已準備好---------------");
					
					for(int i =0; i < requestSize; i++) {
						new Thread(new Runnable() {
							@Override
							public void run() {
								try {
									// 等待service臺服務,requestSize個請求 一塊兒出發
									requestBarrier.await();
								} catch (InterruptedException | BrokenBarrierException e) {
									e.printStackTrace();
								}
								while(true) {
									try {
										queue.put("123");
										System.out.println(Thread.currentThread().getName() + "-----進入睡眠狀態");
										TimeUnit.SECONDS.sleep(3);
										System.out.println(Thread.currentThread().getName() + "-----睡眠狀態,醒來");
									} catch (InterruptedException e) {
										e.printStackTrace();
									}
								}
							}
							
						}, Thread.currentThread().getName()+"-request-" + i).start();
					}
				}
			}, "producerServer-" + i).start();
			
		}
		
		try {
			Thread.currentThread().join();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}
複製代碼

② 消費者

public class DistributeQueueConsumerTest {

	public static void main(String[] args) {
		satrtConsumer();
	}
	
	public static void satrtConsumer() {
		// 服務集羣數
		int service = 2;
		// 併發數
		int requestSize = 2;
		
		CyclicBarrier requestBarrier = new CyclicBarrier(requestSize * service);
		
		// 多線程模擬分佈式環境下消費者
		for (int i = 0; i < service; i++) {
			new Thread(new Runnable() {	// 進程模擬線程
				public void run() {
					// 模擬分佈式集羣的場景
					BlockingQueue<String> queue = new ZkDistributeQueue(zkConnUrl, queueRootNode, capacity);

					System.out.println(Thread.currentThread().getName() + "---------消費者服務器,已準備好---------------");
					
					for(int i =0; i < requestSize; i++) {	// 操做模擬線程
						new Thread(new Runnable() {
							@Override
							public void run() {
								try {
									// 等待service臺服務,requestSize個請求 一塊兒出發
									requestBarrier.await();
								} catch (InterruptedException | BrokenBarrierException e) {
									e.printStackTrace();
								}
								while(true) {
									try {
										queue.take();
										System.out.println(Thread.currentThread().getName() + "-----進入睡眠狀態");
										TimeUnit.SECONDS.sleep(3);
										System.out.println(Thread.currentThread().getName() + "-----睡眠狀態,醒來");
									} catch (InterruptedException e) {
										e.printStackTrace();
									}
								}
							}
							
						}, Thread.currentThread().getName()+"-request-" + i).start();
					}
				}
			}, "consummerServer-" + i).start();
		}
		
		try {
			Thread.currentThread().join();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}
複製代碼

執行結果

① 先執行生產者

此時沒有消費者去進行消費,因此隊列沒一會兒就滿了,咱們須要注意,阻塞的不只僅是隊列,分佈式鎖也被阻塞了。

② 啓動消費者

基本上是生產者放入一個消費者就消費一個的狀態。從而證實該分佈式隊列已經正常工做了。


finally

篇幅和上次同樣比較長,主要是補充了上次沒講到的集羣的ZAB協議和zookeeper的其中一個分佈式隊列的應用場景,其實在平常開發中使用zookeeper來實現隊列是基本不會發生的,比較常見的都是activeMQ,rabbitMQ,kafka等等。不過仍然有必要去了解隊列的基本工做思路。咱們也至關於本身手寫了一個擁有基礎功能的MQ

你必定聽過dubbo+zookeeper的萬金油組合配置中心究竟是什麼?zookeeper又是如何依靠自身機制來實現配置中心的?

下一篇:從零開始的高併發(五)--- Zookeeper的經典應用場景2