乐趣区

关于后端:MQTT-在-Elixir-中的应用

简介

最近读到一本好书,书名是《通过 Elixir 和 Nerves 搭建气象站》,书中介绍了如何引入 Elixir 作为构建嵌入式应用程序的工具。

通过应用 Nerves,咱们能够在反对网络的设施上运行 Elixir 代码,并且与一些控制软件交互。

下面提到的书次要关注点在 Nerves,应用 HTTP 协定进行网络交互。只管在许多状况下这是一个正当的抉择,但我想介绍另一个宽泛用于生产物联网 (IoT) 设置的抉择:MQTT。

MQTT 协定

MQTT 是一种专为物联网 (IoT) 设施通信而设计的音讯传输协定。它广泛应用于许多畛域,例如银行、石油和天然气、制造业等。

MQTT 协定有很多长处,局部如下所示:

  • 它是一种轻量级的二进制协定,通常在 TCP/IP 协定之上运行。
  • 它专为网络不牢靠的场景设计,是户外装置的现实抉择。
  • 它遵循公布 / 订阅模式,简化客户端逻辑。

咱们将在设置中演示 MQTT 的一些劣势。

MQTT Broker

MQTT 的一个重要特色是它简化了客户端逻辑,这对于嵌入式设施至关重要。这是通过公布 / 订阅模式实现的:在 MQTT 中,没有“服务器”的概念。相同,所有参加实体都是连贯到所谓 broker 的客户端。客户端 订阅主题 向它们公布音讯,broker 进行路由(以及许多其余事件)。

一个好的用于生产的 broker,如 EMQ X,通常不仅提供 MQTT 路由性能,还提供许多其余乏味的性能,例如

  • 其余类型的连贯办法,如 WebSockets;
  • 不同的认证和受权模式;
  • 将数据流传输到数据库;
  • 基于音讯特色的自定义路由规定。

传感器设置

为简略起见,咱们的设施将由一个一般的 Mix 应用程序示意:它能够轻松转换为 Nerves 应用程序。

首先,咱们创立一个 Mix 我的项目:

mix new --sup weather_sensor
cd weather_sensor

为了与 MQTT broker 交互,咱们须要一个 MQTT 客户端。咱们采纳 emqtt。将其增加到 mix.exs 作为依赖项:

defp deps do
  [{:emqtt, github: "emqx/emqtt", tag: "1.4.4", system_env: [{"BUILD_WITHOUT_QUIC", "1"}]}
  ]
end

咱们将把所有的“传感器”代码放到主模块 WeatherSensor 中,所以咱们须要将它增加到应用程序管理器 lib/weather_sensor/application.ex 中:

defmodule WeatherSensor.Application do
  @moduledoc false

  use Application

  @impl true
  def start(_type, _args) do
    children = [WeatherSensor]

    opts = [strategy: :one_for_one, name: WeatherSensor.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

当初,让咱们在 lib/weather_sensor.ex 中实现主模块:

defmodule WeatherSensor do
  @moduledoc false

  use GenServer

  def start_link([]) do
    GenServer.start_link(__MODULE__, [])
  end

  def init([]) do
    interval = Application.get_env(:weather_sensor, :interval)
    emqtt_opts = Application.get_env(:weather_sensor, :emqtt)
    report_topic = "reports/#{emqtt_opts[:clientid]}/temperature"
    {:ok, pid} = :emqtt.start_link(emqtt_opts)
    st = %{
      interval: interval,
      timer: nil,
      report_topic: report_topic,
      pid: pid
    }

    {:ok, set_timer(st), {:continue, :start_emqtt}}
  end

  def handle_continue(:start_emqtt, %{pid: pid} = st) do
    {:ok, _} = :emqtt.connect(pid)

    emqtt_opts = Application.get_env(:weather_sensor, :emqtt)
    clientid = emqtt_opts[:clientid]
    {:ok, _, _} = :emqtt.subscribe(pid, {"commands/#{clientid}/set_interval", 1})
    {:noreply, st}
  end

  def handle_info(:tick, %{report_topic: topic, pid: pid} = st) do
    report_temperature(pid, topic)
    {:noreply, set_timer(st)}
  end

  def handle_info({:publish, publish}, st) do
    handle_publish(parse_topic(publish), publish, st)
  end

  defp handle_publish(["commands", _, "set_interval"], %{payload: payload}, st) do
    new_st = %{st | interval: String.to_integer(payload)}
    {:noreply, set_timer(new_st)}
  end

  defp handle_publish(_, _, st) do
    {:noreply, st}
  end

  defp parse_topic(%{topic: topic}) do
    String.split(topic, "/", trim: true)
  end

  defp set_timer(st) do
    if st.timer do
      Process.cancel_timer(st.timer)
    end
    timer = Process.send_after(self(), :tick, st.interval)
    %{st | timer: timer}
  end

  defp report_temperature(pid, topic) do
    temperature = 10.0 + 2.0 * :rand.normal()
    message = {System.system_time(:millisecond), temperature}
    payload = :erlang.term_to_binary(message)
    :emqtt.publish(pid, topic, payload)
  end
end

并在 config/config.exs 中增加一些选项:

import Config

config :weather_sensor, :emqtt,
  host: '127.0.0.1',
  port: 1883,
  clientid: "weather_sensor",
  clean_start: false,
  name: :emqtt

config :weather_sensor, :interval, 1000

让咱们总结一下 WeatherSensor 中产生的事件:

  • 它实现了 GenServer 行为。
  • 启动时,它有如下动作:

    • 关上一个 MQTT 连贯;
    • 订阅 commands/weather_sensor/set_interval 主题以接管命令,将接管到的数据将通过 :emqtt 发送到过程,作为 {:publish, publish}音讯。
    • 以预约义的工夫距离设置计时器。
  • 在定时器超时时,它公布 {Timestamp, Temperature} 元组到 reports/weather_sensor/temperature 主题。
  • 在收到来自 commands/weather_sensor/set_interval 主题的音讯时,它会更新计时器距离。

因为咱们的应用程序不是真正的 Nerves 应用程序,它连贯了 BMP280 之类的传感器,因而咱们生成温度数据。

在这里咱们曾经能够看到绝对于 HTTP 交互的一个劣势:咱们不仅能够发送数据,还能够实时接管一些命令。

咱们须要一个 broker 来运行节点;咱们稍后会开始。

控制台设置

因为 MQTT 中没有“服务器”,因而咱们的控制台也将是一个 MQTT 客户端。但它会 订阅 reports/weather_sensor/temperature 主题和 公布 命令到 commands/weather_sensor/set_interval。

对于控制台,咱们将设置 Phoenix LiveView 应用程序。

创立过程如下:

mix phx.new --version
Phoenix installer v1.6.2
mix phx.new weather_dashboard --no-ecto --no-gettext --no-dashboard --live
cd weather_dashboard

向 mix.exs 增加依赖项

  defp deps do
    [
      ...
      {:jason, "~> 1.2"},
      {:plug_cowboy, "~> 2.5"},

      {:emqtt, github: "emqx/emqtt", tag: "1.4.4", system_env: [{"BUILD_WITHOUT_QUIC", "1"}]},
      {:contex, github: "mindok/contex"} # We will need this for SVG charts
    ]
  end

向 config/dev.exs 增加一些设置:

config :weather_dashboard, :emqtt,
  host: '127.0.0.1',
  port: 1883

config :weather_dashboard, :sensor_id, "weather_sensor"

# Period for chart
config :weather_dashboard, :timespan, 60

当初咱们生成一个 LiveView 控制器:

mix phx.gen.live Measurements Temperature temperatures  --no-schema --no-context

这会生成很多文件,但并非都是必须的,咱们须要的是一个带有图表的单页应用程序。

rm lib/weather_dashboard_web/live/temperature_live/form_component.*
rm lib/weather_dashboard_web/live/temperature_live/show.*
rm lib/weather_dashboard_web/live/live_helpers.ex

同时从 lib/weather_dashboard_web.ex 中删除 import WeatherDashboardWeb.LiveHelpers。

更新咱们页面的模板(lib/weather_dashboard_web/live/temperature_live/index.html.heex):

<div>
  <%= if @plot do %>
    <%= @plot %>
  <% end %>
</div>

<div>
  <form phx-submit="set-interval">
    <label for="interval">Interval</label>
    <input type="text" name="interval" value={@interval}/>
    <input type="submit" value="Set interval"/>
  </form>
</div>

咱们有一个图表和输出控件,用于向此页面上的“设施”发送命令。

当初更新次要局部 LiveView 控制器(lib/weather_dashboard_web/live/temperature_live/index.ex):

defmodule WeatherDashboardWeb.TemperatureLive.Index do
  use WeatherDashboardWeb, :live_view

  require Logger

  @impl true
  def mount(_params, _session, socket) do
    reports = []
    emqtt_opts = Application.get_env(:weather_dashboard, :emqtt)
    {:ok, pid} = :emqtt.start_link(emqtt_opts)
    {:ok, _} = :emqtt.connect(pid)
    # Listen reports
    {:ok, _, _} = :emqtt.subscribe(pid, "reports/#")
    {:ok, assign(socket,
      reports: reports,
      pid: pid,
      plot: nil,
      interval: nil
    )}
  end

  @impl true
  def handle_params(_params, _url, socket) do
    {:noreply, socket}
  end

  @impl true
  def handle_event("set-interval", %{"interval" => interval_s}, socket) do
    case Integer.parse(interval_s) do
      {interval, ""} ->
        id = Application.get_env(:weather_dashboard, :sensor_id)
        # Send command to device
        topic = "commands/#{id}/set_interval"
        :ok = :emqtt.publish(socket.assigns[:pid],
          topic,
          interval_s,
          retain: true
        )
        {:noreply, assign(socket, interval: interval)}
      _ ->
        {:noreply, socket}
    end
  end

  def handle_event(name, data, socket) do
    Logger.info("handle_event: #{inspect([name, data])}")
    {:noreply, socket}
  end

  @impl true
  def handle_info({:publish, packet}, socket) do
    handle_publish(parse_topic(packet), packet, socket)
  end

  defp handle_publish(["reports", id, "temperature"], %{payload: payload}, socket) do
    if id == Application.get_env(:weather_dashboard, :sensor_id) do
      report = :erlang.binary_to_term(payload)
      {reports, plot} = update_reports(report, socket)
      {:noreply, assign(socket, reports: reports, plot: plot)}
    else
      {:noreply, socket}
    end
  end

  defp update_reports({ts, val}, socket) do
    new_report = {DateTime.from_unix!(ts, :millisecond), val}
    now = DateTime.utc_now()
    deadline = DateTime.add(DateTime.utc_now(), - 2 * Application.get_env(:weather_dashboard, :timespan), :second)
    reports =
      [new_report | socket.assigns[:reports]]
      |> Enum.filter(fn {dt, _} -> DateTime.compare(dt, deadline) == :gt end)
      |> Enum.sort()

    {reports, plot(reports, deadline, now)}
  end

  defp parse_topic(%{topic: topic}) do
    String.split(topic, "/", trim: true)
  end

  defp plot(reports, deadline, now) do
    x_scale =
      Contex.TimeScale.new()
      |> Contex.TimeScale.domain(deadline, now)
      |> Contex.TimeScale.interval_count(10)

    y_scale =
      Contex.ContinuousLinearScale.new()
      |> Contex.ContinuousLinearScale.domain(0, 30)

    options = [
      smoothed: false,
      custom_x_scale: x_scale,
      custom_y_scale: y_scale,
      custom_x_formatter: &x_formatter/1,
      axis_label_rotation: 45
    ]

    reports
    |> Enum.map(fn {dt, val} -> [dt, val] end)
    |> Contex.Dataset.new()
    |> Contex.Plot.new(Contex.LinePlot, 600, 250, options)
    |> Contex.Plot.to_svg()
  end

  defp x_formatter(datetime) do
    datetime
    |> Calendar.strftime("%H:%M:%S")
  end

end

特地阐明如下:

  • 咱们创立了一个 LiveView 处理程序来为咱们的应用程序的主页提供服务。
  • 通常,Phoenix.PubSub 用于更新 LiveView 过程状态。然而,咱们做了一个非凡设置:因为 MQTT broker 曾经提供了一个公布订阅模式,咱们间接从 LiveView 过程连贯到它。
  • 收到新的温度数据后,服务器更新温度图表。
  • 收到用户的表单更新后,咱们会向命令主题发送更新的工夫距离。

最初,在 lib/weather_dashboard_web/router.ex 中设置路由,以便咱们的控制器可能解决根页面:

  scope "/", WeatherDashboardWeb do
    pipe_through :browser

    live "/", TemperatureLive.Index
  end

模块集成

当初,咱们已筹备设置并运行所有内容。

咱们运行一个 MQTT broker。因为咱们不须要任何特定的设置,最简略的办法是应用 docker 运行代 broker。

docker run -d --name emqx -p 1883:1883 emqx/emqx:4.3.10

当初运行咱们的“设施”:

cd weather_sensor
export BUILD_WITHOUT_QUIC=1
iex -S mix
Erlang/OTP 24 [erts-12.1.2]  [64-bit] [smp:16:16] [ds:16:16:10] [async-threads:1] [jit] [dtrace]

....

13:17:24.461 [debug] emqtt(weather_sensor): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 8, false, 1, false}, {:mqtt_packet_subscribe, 2, %{}, [{"/commands/weather_sensor/set_interval", %{nl: 0, qos: 1, rap: 0, rh: 0}}]}, :undefined}

13:17:24.463 [debug] emqtt(weather_sensor): RECV Data: <<144, 3, 0, 2, 1>>

13:17:25.427 [debug] emqtt(weather_sensor): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 3, false, 0, false}, {:mqtt_packet_publish, "/reports/weather_sensor/temperature", :undefined, :undefined}, <<131, 104, 2, 110, 6, 0, 179, 156, 178, 158, 125, 1, 70, 64, 38, 106, 91, 64, 234, 212, 185>>}

13:17:26.428 [debug] emqtt(weather_sensor): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 3, false, 0, false}, {:mqtt_packet_publish, "/reports/weather_sensor/temperature", :undefined, :undefined}, <<131, 104, 2, 110, 6, 0, 156, 160, 178, 158, 125, 1, 70, 64, 39, 115, 221, 187, 144, 192, 31>>}
...

咱们看到咱们的传感器立刻开始发送报告。

当初运行咱们的控制台:

cd weather_dashboard
export BUILD_WITHOUT_QUIC=1
iex -S mix phx.server
Erlang/OTP 24 [erts-12.1.2]  [64-bit] [smp:16:16] [ds:16:16:10] [async-threads:1] [jit] [dtrace]

[info] Running WeatherDashboardWeb.Endpoint with cowboy 2.9.0 at 127.0.0.1:4000 (http)
[info] Access WeatherDashboardWeb.Endpoint at http://localhost:4000
Interactive Elixir (1.12.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> [watch] build finished, watching for changes...

让咱们导航到 http://localhost:4000。

咱们看到相应的 LiveView 过程挂载,连贯到代理,并开始接管温度数据:

[info] GET /
[info] Sent 200 in 145ms
[info] CONNECTED TO Phoenix.LiveView.Socket in 129µs
  Transport: :websocket
  Serializer: Phoenix.Socket.V2.JSONSerializer
  Parameters: %{"_csrf_token" => "cwoROxAwKFo7NEcSdgMwFlgaZ1AlBxUa6FIRhAbjHA6XORIF-EUiIRqU", "_mounts" => "0", "_track_static" => %{"0" => "http://localhost:4000/assets/app.css", "1" => "http://localhost:4000/assets/app.js"}, "vsn" => "2.0.0"}
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 1, false, 0, false}, {:mqtt_packet_connect, "MQTT", 4, false, true, false, 0, false, 60, %{}, "emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130", :undefined, :undefined, :undefined, :undefined, :undefined}, :undefined}
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): RECV Data: <<32, 2, 0, 0>>
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 8, false, 1, false}, {:mqtt_packet_subscribe, 2, %{}, [{"/reports/#", %{nl: 0, qos: 0, rap: 0, rh: 0}}]}, :undefined}
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): RECV Data: <<144, 3, 0, 2, 0>>
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): RECV Data: <<48, 58, 0, 35, 47, 114, 101, 112, 111, 114, 116, 115, 47, 119, 101, 97, 116,
  104, 101, 114, 95, 115, 101, 110, 115, 111, 114, 47, 116, 101, 109, 112, 101,
  114, 97, 116, 117, 114, 101, 131, 104, 2, 110, 6, 0, 180, 251, 188, 158, 125,
...

此外,该页面立刻开始更新:

如果咱们更新距离,咱们看到设施节点立刻收到命令并开始更频繁地更新:

当初咱们演示一件重要的事件:让咱们进行咱们的“设施”节点,稍等片刻,而后重新启动它。咱们看到节点持续以更新的频率发送数据。

怎么会这样?其实很简略,秘诀就在于咱们发送到命令主题的命令音讯的 retain 标记。

:ok = :emqtt.publish(socket.assigns[:pid],
  topic,
  interval_s,
  retain: true
)

当咱们向主题发送带有 retain 标记的音讯时,该音讯也成为“默认”音讯,并保留在 broker 上。该主题的每个订阅者都会在订阅时收到此音讯。

对于可能常常离线且没有任何易于应用的本地存储来放弃其状态的嵌入式设施,此性能十分重要。这是在连贯时正确配置它们的办法。

论断

这篇文章介绍了如下内容:

  • 展现了一种与嵌入式设施交互的风行形式——MQTT 协定;
  • 咱们介绍了它在 Elixir 中的用法;
  • 咱们还展现了 MQTT 的一些劣势,例如公布订阅模式和音讯保留。

即便在简略的设置中,咱们也可能想要应用的弱小性能是:

  • 将主题数据流传输到数据库中,这样咱们能够显示连贯历史,无需“手动”保留;
  • 应用 MQTT.js 通过 WebSockets 从前端间接连贯到 broker。

所有代码都能够在 https://github.com/savonarola/mqtt-article 上查阅。

退出移动版