明天来实现服务器的第一个部件 - beacon_server。
性能解析
为了建设Elixir集群,须要所有 Beam 节点在启动之时就曾经晓得一个固定的节点用来连贯,之后 Beam 会主动实现节点之间的链接,即默认的全连贯
模式,所有节点两两之间均有连贯。对于这一点我还没有深刻思考过有没有必要进行调整,之后看状况再说
因而,为了让服务器集群内的所有节点在启动时都可能连贯一个固定节点从而组成集群,这个固定节点就是beacon_server
。
beacon_server
须要有什么性能呢?在通过一番简略思考后,至多须要具备以下几个性能:
- 承受其余节点的连贯
- 承受其余节点的注册信息
- 相应其余节点的需要,返回需要节点的信息
这里有两个重要概念:资源(Resource)
和 需要(Requirement)
。资源
指某个节点本身的内容类型,也就是在集群中所处的角色,比方网关服务器的资源就是网关(gate_server);需要
指某个节点须要的其余节点,比方网关节点须要网关治理节点(gate_manager)来注册本人,数据服务节点须要数据分割节点(data_contact)来把数据库同步到本身。
当一个节点向beacon_server
节点注册时,咱们心愿它可能向beacon_server
提供本人的节点名称、资源、需要等数据,不便beacon_server
在收到别的节点注册时,可能把曾经注册过的节点当做需要返回给别的节点。
数据结构
我用一个 GenServer
线程负责下面所说的所有工作,利用线程的 state
来保留来往节点信息。以后粗略想了想,权且定义信息存储格局如下:
%{ nodes: %{ "node1@host": :online, "node2@host": :offline }, requirements: [ %{ module: Module.Interface, name: [:requirement_name], node: :"node@host" } ], resources: [ %{ module: Module.Interface, name: :resoutce_name, node: :"node@host" } ]}
我用一个字典存储所有信息,分为 nodes
、requirements
以及resources
三局部。
nodes
存储所有曾经连贯的节点和他们的状态,:online
示意在线失常连贯,:offline
示意节点断开连接;
requirements
存储每个节点注册时提供的需要信息。应用列表存储,列表中每个项代表一个节点。项应用字典,存储模块(module)、名称(name)、节点(node)信息。其中名称
字段,因为有些节点可能会有不只一个需要
,因而应用列表存储。模块
字段是为了留着以备后用,目前没什么用……节点
字段用于获取的节点应用该字段对指标节点发送音讯,必不可少。
resources
存储每个节点注册时提供的资源信息,字段与requirements
完全相同,有一个不同的中央是名称
字段的数据类型不再是列表,而是原子,因为每个节点只可能属于惟一的一种资源,不可能属于两种以上,因而用一个繁多的原子就能够代表了。
简要实现
建设我的项目
这是第一个实现,在实现之前,咱们先建设一个umbrella
我的项目,用来寄存之后的所有代码:
mix new cluster --umbrella
而后创立本节的beacon_server
我的项目:
cd apps/mix new beacon_server --sup
--sup
用来生成监督树。
有了我的项目之后,咱们须要建设一个GenServer
,用来充当其余节点用来通信的接口,咱们就把他叫做Beacon
好了。
性能函数
依据后面的构想,咱们须要上面这么几个函数:
- register(credentials, state) - 用于把注册来的节点信息记录在
state
中,并将新的state
返回。 - get_requirements(node, requirements, resources) - 用于向已注册的节点返回其需要。
上面贴上我粗略实现的代码,当然这不会是最终版本,将来还有优化的空间:
@spec register({node(), module(), atom(), [atom()]}, map()) :: {:ok, map()}defp register( {node, module, resource, requirement}, state = %{nodes: connected_nodes, resources: resources, requirements: requirements} ) do Logger.debug("Register: #{node} | #{resource} | #{inspect(requirement)}") {:ok, %{ state | nodes: add_node(node, connected_nodes), resources: add_resource(node, module, resource, resources), requirements: if requirement != [] do add_requirement(node, module, requirement, requirements) else requirements end } }end@spec get_requirements(node(), list(map()), list(map())) :: list(map())defp get_requirements(node, requirements, resources) do req = find_requirements(node, requirements) offer = find_resources(req, resources) offerend
下面代码中用到的其余公有函数我就不贴了,总之就是利用线程 state
中的数据返回新的数据。
除了这两个必要的函数,我还想增加两个可能监控节点通断的函数。这两个函数通过 handle_info
实现。首先须要在线程初始化的时候开启这项性能:
:net_kernel.monitor_nodes(true)
之后实现两个 callback:
# ========== Node monitoring ==========@impl truedef handle_info({:nodeup, node}, state) do Logger.debug("Node connected: #{node}") {:noreply, state}end@impl truedef handle_info({:nodedown, node}, state = %{nodes: node_list}) do Logger.critical("Node disconnected: #{node}") {:noreply, %{state | nodes: %{node_list | node => :offline}}}end
不在 :nodeup
回调中将节点状态批改为 :online
是因为节点在注册的时候,注册函数曾经将节点的状态批改为 :online
了。
接口函数
有了性能之后,还须要提供对外接口,GenServer
曾经提供了相干的回调函数供咱们实现,在这里我应用 handle_call/3
,因为注册流程须要是同步的,只有注册实现之后对应节点能力开始失常运行。
同样地,对外接口也是两个,别离是 :register
和 :get_requirements
:
@impl true# Register node with resource and requirement.def handle_call( {:register, credentials}, _from, state ) do Logger.info("New register from #{inspect(credentials, pretty: true)}.") {:ok, new_state} = register(credentials, state) Logger.info("Register #{inspect(credentials, pretty: true)} complete.", ansi_color: :green) {:reply, :ok, new_state}end@impl true# Reply to caller node with specified requirementsdef handle_call( {:get_requirements, node}, _from, state = %{nodes: _, resources: resources, requirements: requirements} ) do Logger.debug("Getting requirements for #{inspect(node)}") offer = get_requirements(node, requirements, resources) {:reply, case length(offer) do 0 -> nil _ -> Logger.info("Requirements retrieved: #{inspect(offer, pretty: true)}", ansi_color: :green) {:ok, offer} end, state}end
至此,Beacon
功能模块就根本残缺了,最初咱们须要把它退出到监督树里使其运行起来。在 application.ex
中:
def start(_type, _args) do children = [ # Starts a worker by calling: BeaconServer.Worker.start_link(arg) {BeaconServer.Beacon, name: BeaconServer.Beacon} ] # See https://hexdocs.pm/elixir/Supervisor.html # for other strategies and supported options opts = [strategy: :one_for_one, name: BeaconServer.Supervisor] Supervisor.start_link(children, opts)end
像这样把 Beacon
模块退出到监督者的子线程列表中,beacon_server
临时就算实现了。
成果测试
运行一下试试:
iex --name beacon1@127.0.0.1 --cookie mmo -S mix
为了让其余节点连贯,name
和 cookie
肯定好设置好。
我写了点测试代码调用一下试试:
最初咱们看一下 Beacon
模块的 state
长什么样:
就先这样,前面咱们会在此基础上持续实现别的服务器。