[elixir! #0061] 高負載高併發問題的萬能鑰匙 ---- 隊列(queue)

高負載高併發問題,不只僅出如今面試中,在平常生活中也很常見,好比周末去熱鬧的商場吃飯,餐廳們口常常會須要排隊取號。能夠概括爲「需求」和「資源」的不匹配,多出來的「需求」的不到知足,就須要有合適的機制讓這些」需求「進行 等待 或者 撤銷。面試

讓咱們用 elixir 模擬這樣一個場景:一個服務器裏一共有 3 個座位,每進來1個客人就要佔用一個座位,座位佔滿以後服務器就沒法提供服務。緩存

defmodule M5 do
  use GenServer

  @seats 3
  @wait_timeout 1000

  def start() do
    GenServer.start(__MODULE__, :ok)
  end

  def enter(pid) do
    GenServer.call(pid, :enter, @wait_timeout)
  end

  def leave(pid) do
    GenServer.cast(pid, :leave)
  end

  def init(_) do
    {:ok, @seats}
  end

  def handle_call(:enter, {_pid, _ref}, seats) do
    IO.puts("got enter request")

    if seats > 0 do
      {:reply, :ok, print(seats - 1)}
    else
      {:noreply, print(seats)}
    end
  end

  def handle_cast(:leave, seats) do
    IO.puts("free seats: #{seats}")
    {:noreply, print(seats + 1)}
  end

  defp print(seats) do
    IO.puts("free seats: #{seats}")
    seats
  end
end

再定義這樣一個函數,模擬客人們同時要求進入服務器,若是得不到響應,就會 BOOM!服務器

def concurrent_enter(pid, n, t) do
    for _ <- 1..n do
      spawn(fn ->
        try do
          enter(pid)
          :timer.sleep(t)
          leave(pid)
        catch
          _, _ ->
            IO.puts("BOOM!")
        end
      end)
    end
  end

在同時進來的客人小於3人時,一切都很好,然而咱們知道實際狀況確定不會是這樣,同時出現的客人必定會大於3人。
咱們知道這裏一共就3個座位,因此不管如何不能夠同時處理超過3位客人。可是好消息是,每一個客人有1秒鐘的等待耐心,因此只要在客人失去耐心以前有座位空出來,咱們就不至於丟掉這位客人。
因此按理說只要在 1秒鐘以前,有客人離開,新的客人就能夠進來,咱們來試試看是否是這樣。設置同時進入的客人數量爲4,每位客人用餐時間爲 500 毫秒:併發

iex(8)> concurrent_enter s, 4, 500
got enter request
free seats: 2
got enter request
free seats: 1
got enter request
free seats: 0
got enter request
free seats: 0
free seats: 0
free seats: 1
free seats: 1
free seats: 2
free seats: 2
free seats: 3
BOOM!

BOOM!爲何會這樣?咱們注意到第4爲客人請求進入時,是沒有空座的,然而座位空出來以後,他也沒有獲得任何通知,也就是他並不知道有空座了。
一種簡單的解決方案就是使用隊列。讓等待中的客人進入隊列排隊,每次服務器裏有客人離開,就檢查一下等待隊列。只須要對咱們的代碼作以下修改:函數

def init(_) do
    {:ok, %{seats: @seats, queue: :queue.new()}}
  end

  def handle_call(:enter, {_pid, _ref} = from, %{seats: seats} = state) do
    IO.puts("got enter request")

    if seats > 0 do
      {:reply, :ok, do_enter(state)}
    else
      handle_overload(from, state)
    end
  end

  defp do_enter(%{seats: seats} = state) do
    %{state | seats: print(seats - 1)}
  end

  def handle_overload(from, %{queue: queue} = state) do
    {:noreply, %{state | queue: :queue.in(from, queue)}}
  end

  def handle_cast(:leave, %{seats: seats} = state) do
    IO.puts("free seats: #{seats}")

    {:noreply,
     state
     |> do_leave()
     |> check_queue()}
  end

  defp do_leave(state) do
    %{state | seats: print(state.seats + 1)}
  end

  defp check_queue(%{queue: queue} = state) do
    case :queue.out(queue) do
      {:empty, _queue} ->
        state

      {{:value, from}, queue} ->
        GenServer.reply(from, :ok)

        %{state | queue: queue}
        |> do_enter()
    end
  end

如今咱們能夠挑戰一些刺激的:6人同時請求進入服務器,這是咱們理論上能夠達到的最高負載:高併發

iex(21)> concurrent_enter s, 6, 500
got enter request
free seats: 2
got enter request
free seats: 1
got enter request
free seats: 0
got enter request
got enter request
got enter request
free seats: 0
free seats: 1
free seats: 0
free seats: 0
free seats: 1
free seats: 0
free seats: 0
free seats: 1
free seats: 0
free seats: 0
free seats: 1
free seats: 1
free seats: 2
free seats: 2
free seats: 3

Perfect! 注意到每當有座位空出來,立刻就會被等待隊列裏的客人使用。spa


覺得事情就這樣愉快的結束了麼?不,讓咱們模擬一下同時有 6 位客人進入,每位用餐時間是 1100 毫秒:code

iex(25)> concurrent_enter s, 6, 1100
got enter request
free seats: 2
got enter request
free seats: 1
got enter request
free seats: 0
got enter request
got enter request
got enter request
BOOM!  
BOOM!    
BOOM!     
free seats: 0
free seats: 1
free seats: 0
free seats: 0
free seats: 1
free seats: 0
free seats: 0
free seats: 1
free seats: 0

在咱們意料之中的是,後3位客人沒能在 timeout 以前進入服務器。然而,服務器並不知道他們已經失去耐心了,依舊在有空位出現後通知他們進入服務器。這些客人變成了可怕的殭屍客人,他們永遠不會離開服務器,致使服務器裏的空位始終爲0.server

咱們能夠限制客人的最長用餐時間,然而這樣殭屍客人依舊會佔用咱們大量的時間。更好的方法是要求客人們在發送 enter 請求的時候就附帶上他們的最大耐心(wait_timeout),而後計算出客人失去耐心的時間輟(deadline)。若是有空位出現時,等待隊列裏面的客人已經失去耐心,那麼服務器就能夠直接跳過他,隊列

作了修改以後的代碼變成了這樣:

def enter(pid, wait_timeout) do
    GenServer.call(pid, {:enter, wait_timeout}, wait_timeout)
  end

  ...

  def handle_call({:enter, timeout}, {_pid, _ref} = from, %{seats: seats} = state) do
    IO.puts("got enter request")

    if seats > 0 do
      {:reply, :ok, do_enter(state, from)}
    else
      handle_overload({from, timeout}, state)
    end
  end

  defp do_enter(%{requests: requests} = state, from) do
    case requests do
      %{^from => %{deadline: deadline}} ->
        state = %{state | requests: Map.delete(requests, from)}

        if past_deadline?(deadline) do
          state
          |> check_queue()
        else
          handle_enter(state, from)
        end

      _ ->
        handle_enter(state, from)
    end
  end

  defp past_deadline?(deadline) do
    :os.system_time(:millisecond) > deadline
  end

  defp handle_enter(%{seats: seats} = state, from) do
    GenServer.reply(from, :ok)
    %{state | seats: print(seats - 1)}
  end

  def handle_overload({from, timeout}, %{queue: queue, requests: requests} = state) do
    request_info = %{deadline: :os.system_time(:millisecond) + timeout}

    {:noreply,
     %{state | queue: :queue.in(from, queue), requests: Map.put(requests, from, request_info)}}
  end

  ...

  def concurrent_enter(pid, n, wait_timeout) do
    for _ <- 1..n do
      spawn(fn ->
        try do
          enter(pid, wait_timeout)
          :timer.sleep(1000)
          leave(pid)
        catch
          err, msg ->
            IO.puts("BOOM!" <> inspect({err, msg}))
        end
      end)
    end
  end

爲了簡化問題,咱們把每位客人的用餐時間固定爲 1000 毫秒,而後把concurrent_enter的第三個參數修改成客人的耐心時間(wait_timeout). 咱們就能夠構造這種情形:

# 來了 6 位耐心爲 500 毫秒的客人
    concurrent_enter(pid, 6, 500)
    # 100 毫秒以後
    :timer.sleep(100)
    # 來了 2 位耐心爲 2000 毫秒的客人
    concurrent_enter(pid, 2, 2000)

模擬的結果代表殭屍客人能夠被馬上辨識出來而且跳過,徹底不影響服務正常客人:

got enter request
free seats: 2
got enter request
free seats: 1
got enter request
free seats: 0
got enter request
got enter request
got enter request
got enter request
got enter request
BOOM!{:exit, {:timeout, {GenServer, :call, [#PID<0.515.0>, {:enter, 500}, 500]}}}
BOOM!{:exit, {:timeout, {GenServer, :call, [#PID<0.515.0>, {:enter, 500}, 500]}}}
BOOM!{:exit, {:timeout, {GenServer, :call, [#PID<0.515.0>, {:enter, 500}, 500]}}}
free seats: 0
free seats: 1
free seats: 0
free seats: 0
free seats: 1
free seats: 0
free seats: 0
free seats: 1
free seats: 1
free seats: 2
free seats: 2
free seats: 3

至此,咱們擁有了一個相對智能的資源服務器了,他能夠在有空餘資源時馬上回復等待隊列中的請求,而且在請求超時時將其跳過。

技術總結

  • 對於有限的資源,使用隊列(queue)的方式將請求先緩存下來
  • 使用 checkout(至關於本文裏的 enter) 和 checkin(本文裏的 leave)的方式去佔用和歸還資源
  • 能夠經過簡單的 noreply call 的方式來實現不阻塞的 server
  • 在有空閒資源可用時,及時通知 client
  • 經過記錄請求的超時時間,來在處理時跳過那些已經 dead 了的請求