什麼?無限緩衝的隊列(二)?

chanx

上篇文章咱們提到,當咱們建立一個有緩衝的通道並指定了容量,那麼在這個通道的生命週期內,咱們將再也沒法改變它的容量。 由此引起了關於無限緩存的 channel 話題討論。
咱們分析了一個實現無限緩衝的代碼。 最後,咱們也提到了它還能夠繼續優化的點。緩存

鳥窩的 chanx 正是基於此方案改造而成的,咱們來看看他倆的不一樣之處。數據結構

上篇文章說過,所謂的無限緩衝,無非是藉助一箇中間層的數據結構,暫存臨時數據。app

chanx 中,結構是這樣的:優化

type UnboundedChan struct {
    In     chan<- T    // channel for write
    Out    <-chan T    // channel for read
    buffer *RingBuffer // buffer
}

inout 的職責在上篇文章已經說明,這裏的 buffer 就是咱們所謂的中間臨時存儲層。其中的 RingBuffer 結構咱們後面再說。spa

func NewUnboundedChan(initCapacity int) UnboundedChan {
    return NewUnboundedChanSize(initCapacity, initCapacity, initCapacity)
}

func NewUnboundedChanSize(initInCapacity, initOutCapacity, initBufCapacity int) UnboundedChan {
    in := make(chan T, initInCapacity)
    out := make(chan T, initOutCapacity)
    ch := UnboundedChan{In: in, Out: out, buffer: NewRingBuffer(initBufCapacity)}

    go process(in, out, ch)

    return ch
}

它提供了兩個初始化 UnboundedChan 的方法,從代碼中咱們能夠明顯的看出,NewUnboundedChanSize 能夠給每一個屬性自定義本身的容量大小。僅此而已。code

chanx 中 關於 inout 都是帶緩衝的通道,而上篇文章中的 inout 都是無緩衝的通道。
這和他們對數據的流轉處理有很大關係。生命週期

咱們接下去看 process(in,out,ch) 最核心的方法。
image隊列

這時候,咱們再放上一篇核心代碼。ci

image

能夠很明顯他們看出它倆的區別。資源

上篇從 in 通道讀數據會先 appendbuffer,而後從 buffer 中取數據寫入 out 通道。
chanxin 通道取出數據先嚐試寫入 out(沒有中間商賺差價?),只有在 out 已經滿的狀況下,才塞入到 buffer

chanx 還有一段小細節代碼。
image

能走到這裏,必定是由於 out 通道滿了。咱們把值追加到 buffer 的同時,須要嘗試把 buffer 中的數據寫入 out
此時 in 通道也許還在持續的寫入數據, 爲了不 in 通道塞滿,阻塞業務寫入,咱們同時須要嘗試從 in 通道中讀數據追加到 buffer

buffer

上篇文章我提到了關於 buffer 優化的點。

chanx 是如何優化的?

// type T interface{}
type RingBuffer struct {
    buf         []T 
    initialSize int
    size        int
    r           int // read pointer
    w           int // write pointer
}

這是 buffer 的結構,其中

  • buf 具體存儲數據的結構。
  • initialSize 初始化化 buf 的長度
  • size 當前 buf 的長度
  • r 當前讀數據位置
  • w 當前寫入數據位置

buffer 本質上就是一個環形的隊列,目的是達到資源的複用。
而且當 buffer 滿時,提供自動擴容的功能。

咱們來看具體把數據寫入 buffer 的源碼。
image

接着看擴容。
image

這段代碼惟一難理解的就是數據遷移了。這裏的數據遷移目的是爲了保證先入先出的原則。

可能加了註釋有些人也沒法理解,那麼就再加一個草率圖。

假設咱們 buffer 的長度是 8。 當前讀和寫的 index 都是5。說明 buffer 滿了,觸發自動擴容規則,進行數據遷移。

那麼遷移的過程就是下圖這樣的。

image

還有,當 buffer 爲空而且當前的 size 比初始化 size 還大,那麼能夠考慮重置 buffer 了。

//if ch.buffer.IsEmpty() && ch.buffer.size > ch.buffer.initialSize { 
//                        ch.buffer.Reset()
//                    }
func (r *RingBuffer) Reset() {
r.r = 0
r.w = 0
r.size = r.initialSize
r.buf = make([]T, r.initialSize)
}

剩下的代碼,就沒什麼好說的了。

總結

繼上篇文章後,這篇文章咱們主要講解了 chanx 是如何實現無限緩衝的 channel
其中最重要的一個點在於 chanxbuffer 實現採用的是 ringbuffer,達到資源複用的同時還能自動擴容。