[elixir! #0063] 爲代碼解耦而生的雙生子 —— PubSub 與消息隊列

在以前的文章 elixir! #0061 高負載高併發問題的萬能鑰匙 ---- 隊列(queue) 中,咱們介紹瞭如何使用隊列來避免 server 在收到多個耗時較長的 call 請求時被阻塞住 mailbox。今天咱們再來討論一下另外一種常見的消息傳遞模式 —— PubSub。segmentfault

PubSub 和消息隊列很是相似,主要的區別是 PubSub 通常適用於同一個消息有多個消費者同時關注的場景。例如,多人在線的直播間,電商實時更新的庫存信息等等。比較側重於性能,而非消息的到達。相同之處在於消息的生產者和消費者是相互解耦的,消息是發送到某個 topic 裏,而非直接發給對方,因此生產者的負擔會減少。消息可能會須要有一個保存機制,多是持久化地保存到硬盤上,也多是隻在內存中停留一段時間,也多是直接發送,不作任何持久化,這樣不在線的消費者就會丟失消息。併發

PubSub 的本質是職責的分離:生產者的職責是要準確地生產消息,把消息投遞到正確的 topic,而不用去關心誰會讀到這個消息。同時,消費者也不用關心是誰生產了這個消息,而只須要關注消息的 topic 和內容。高併發

因此 PubSub server 的職責就是將消息投遞給 topic 的關注者們。這是一個時間複雜度 O(n) 的操做,咱們始終須要遍歷某個 topic 的 subscriber 列表。此外,對某個topic 的關注者列表,會須要作常常的修改:新增關注,取消關注,掉線,都須要增長或者刪除列表的內容,若是一個 topic 有上萬個關注者,就應該考慮這些操做的耗時。性能

這裏實現了一個超簡易的 pubsub:測試

defmodule M6 do
  use GenServer

  def start do
    GenServer.start(__MODULE__, :ok)
  end

  def pub(server, topic, msg) do
    GenServer.call(server, {:pub, topic, msg})
  end

  def sub(server, topic) do
    GenServer.call(server, {:sub, topic})
  end

  def unsub(server, topic) do
    GenServer.call(server, {:unsub, topic})
  end

  @impl true
  def init(_) do
    {:ok, %{topics: %{}}}
  end

  @impl true
  def handle_call({:pub, topic, msg}, _from, state) do
    case state.topics do
      %{^topic => topic_state} ->
        broadcast(topic_state, msg)

      _ ->
        nil
    end

    {:reply, :ok, state}
  end

  def handle_call({:sub, topic}, {pid, _ref}, %{topics: topics} = state) do
    _monitor_ref = Process.monitor(pid)

    topic_state =
      case state.topics do
        %{^topic => topic_state} ->
          topic_state

        _ ->
          MapSet.new()
      end

    {:reply, :ok, %{state | topics: Map.put(topics, topic, add_client(topic_state, pid))}}
  end

  def handle_call({:unsub, topic}, {pid, _ref}, %{topics: topics} = state) do
    topic_state =
      case state.topics do
        %{^topic => topic_state} ->
          topic_state

        _ ->
          %{}
      end

    {:reply, :ok, %{state | topics: Map.put(topics, topic, delete_client(topic_state, pid))}}
  end

  @impl true
  def handle_info({:DOWN, _ref, :process, pid, _}, state) do
    topics =
      Enum.reduce(state.topics, %{}, fn {t, ts}, acc ->
        Map.put(acc, t, delete_client(ts, pid))
      end)

    {:noreply, %{state | topics: topics}}
  end

  defp add_client(topic_state, client) do
    MapSet.put(topic_state, client)
  end

  defp delete_client(topic_state, client) do
    MapSet.delete(topic_state, client)
  end

  defp broadcast(topic_state, msg) do
    Enum.each(topic_state, fn pid ->
      send(pid, msg)
    end)
  end
end

測試一下:spa

iex(30)> {:ok, s} = M6.start                                   
{:ok, #PID<0.216.0>}
iex(31)> :sys.trace s, true                                    
:ok
iex(32)> M6.sub s, "jobs"                                      
*DBG* <0.216.0> got call {sub,<<"jobs">>} from <0.149.0>
*DBG* <0.216.0> sent ok to <0.149.0>, new state #{topics =>
                                                      #{<<"jobs">> =>
                                                            #{'__struct__' =>
                                                                  'Elixir.MapSet',
                                                              map =>
                                                                  #{<0.149.0> =>
                                                                        []},
                                                              version => 2}}}
:ok
iex(33)> spawn(fn -> M6.pub(s, "jobs", "backend engineer") end)
*DBG* <0.216.0> got call {pub,<<"jobs">>,<<"backend engineer">>} from <0.220.0>
#PID<0.220.0>
*DBG* <0.216.0> sent ok to <0.220.0>, new state #{topics =>
                                                      #{<<"jobs">> =>
                                                            #{'__struct__' =>
                                                                  'Elixir.MapSet',
                                                              map =>
                                                                  #{<0.149.0> =>
                                                                        []},
                                                              version => 2}}}
iex(34)> flush                                                 
"backend engineer"
:ok
iex(35)> M6.unsub s, "jobs"                                    
*DBG* <0.216.0> got call {unsub,<<"jobs">>} from <0.149.0>
*DBG* <0.216.0> sent ok to <0.149.0>, new state #{topics =>
                                                      #{<<"jobs">> =>
                                                            #{'__struct__' =>
                                                                  'Elixir.MapSet',
                                                              map => #{},
                                                              version => 2}}}
:ok