乐趣区

关于后端:elixir-0063-为代码解耦而生的双生子-PubSub-与消息队列

在之前的文章 elixir! #0061 高负载高并发问题的万能钥匙 —- 队列 (queue) 中,咱们介绍了如何应用队列来防止 server 在收到多个耗时较长的 call 申请时被阻塞住 mailbox。明天咱们再来讨论一下另一种常见的消息传递模式 —— PubSub。

PubSub 和音讯队列十分相似,次要的区别是 PubSub 个别实用于同一个音讯有多个消费者同时关注的场景。例如,多人在线的直播间,电商实时更新的库存信息等等。比拟侧重于性能,而非音讯的达到。相同之处在于音讯的生产者和消费者是互相解耦的,音讯是发送到某个 topic 里,而非间接发给对方,所以生产者的累赘会减小。音讯可能会须要有一个保留机制,可能是长久化地保留到硬盘上,也可能是只在内存中停留一段时间,也可能是间接发送,不做任何长久化,这样不在线的消费者就会失落音讯。

PubSub 的实质是职责的拆散:生产者的职责是要精确地生产音讯,把音讯投递到正确的 topic,而不必去关怀谁会读到这个音讯。同时,消费者也不必关怀是谁生产了这个音讯,而只须要关注音讯的 topic 和内容。

所以 PubSub server 的职责就是将音讯投递给 topic 的关注者们。这是一个工夫复杂度 O(n) 的操作,咱们始终须要遍历某个 topic 的 subscriber 列表。此外,对某个 topic 的关注者列表,会须要做常常的批改:新增关注,勾销关注,掉线,都须要减少或者删除列表的内容,如果一个 topic 有上万个关注者,就应该思考这些操作的耗时。

这里实现了一个超繁难的 pubsub:

defmodule M6 do
  use GenServer

  def start do
    GenServer.start(__MODULE__, :ok)
  end

  def pub(server, topic, msg) do
    GenServer.call(server, {:pub, topic, msg})
  end

  def sub(server, topic) do
    GenServer.call(server, {:sub, topic})
  end

  def unsub(server, topic) do
    GenServer.call(server, {:unsub, topic})
  end

  @impl true
  def init(_) do
    {:ok, %{topics: %{}}}
  end

  @impl true
  def handle_call({:pub, topic, msg}, _from, state) do
    case state.topics do
      %{^topic => topic_state} ->
        broadcast(topic_state, msg)

      _ ->
        nil
    end

    {:reply, :ok, state}
  end

  def handle_call({:sub, topic}, {pid, _ref}, %{topics: topics} = state) do
    _monitor_ref = Process.monitor(pid)

    topic_state =
      case state.topics do
        %{^topic => topic_state} ->
          topic_state

        _ ->
          MapSet.new()
      end

    {:reply, :ok, %{state | topics: Map.put(topics, topic, add_client(topic_state, pid))}}
  end

  def handle_call({:unsub, topic}, {pid, _ref}, %{topics: topics} = state) do
    topic_state =
      case state.topics do
        %{^topic => topic_state} ->
          topic_state

        _ ->
          %{}
      end

    {:reply, :ok, %{state | topics: Map.put(topics, topic, delete_client(topic_state, pid))}}
  end

  @impl true
  def handle_info({:DOWN, _ref, :process, pid, _}, state) do
    topics =
      Enum.reduce(state.topics, %{}, fn {t, ts}, acc ->
        Map.put(acc, t, delete_client(ts, pid))
      end)

    {:noreply, %{state | topics: topics}}
  end

  defp add_client(topic_state, client) do
    MapSet.put(topic_state, client)
  end

  defp delete_client(topic_state, client) do
    MapSet.delete(topic_state, client)
  end

  defp broadcast(topic_state, msg) do
    Enum.each(topic_state, fn pid ->
      send(pid, msg)
    end)
  end
end

测试一下:

iex(30)> {:ok, s} = M6.start                                   
{:ok, #PID<0.216.0>}
iex(31)> :sys.trace s, true                                    
:ok
iex(32)> M6.sub s, "jobs"                                      
*DBG* <0.216.0> got call {sub,<<"jobs">>} from <0.149.0>
*DBG* <0.216.0> sent ok to <0.149.0>, new state #{topics =>
                                                      #{<<"jobs">> =>
                                                            #{'__struct__' =>
                                                                  'Elixir.MapSet',
                                                              map =>
                                                                  #{<0.149.0> =>
                                                                        []},
                                                              version => 2}}}
:ok
iex(33)> spawn(fn -> M6.pub(s, "jobs", "backend engineer") end)
*DBG* <0.216.0> got call {pub,<<"jobs">>,<<"backend engineer">>} from <0.220.0>
#PID<0.220.0>
*DBG* <0.216.0> sent ok to <0.220.0>, new state #{topics =>
                                                      #{<<"jobs">> =>
                                                            #{'__struct__' =>
                                                                  'Elixir.MapSet',
                                                              map =>
                                                                  #{<0.149.0> =>
                                                                        []},
                                                              version => 2}}}
iex(34)> flush                                                 
"backend engineer"
:ok
iex(35)> M6.unsub s, "jobs"                                    
*DBG* <0.216.0> got call {unsub,<<"jobs">>} from <0.149.0>
*DBG* <0.216.0> sent ok to <0.149.0>, new state #{topics =>
                                                      #{<<"jobs">> =>
                                                            #{'__struct__' =>
                                                                  'Elixir.MapSet',
                                                              map => #{},
                                                              version => 2}}}
:ok
退出移动版