关于python:实战案例丨分布式系统中如何用python实现Paxos

61次阅读

共计 19876 个字符,预计需要花费 50 分钟才能阅读完成。

摘要:提到分布式算法,就不得不提 Paxos 算法,在过来几十年里,它基本上是分布式共识的代 名词,因为以后最罕用的一批共识算法都是基于它改良的。比方,Fast Paxos 算法、Cheap Paxos 算法、Raft 算法、ZAB 协定等等。

本文分享自华为云社区《实战分布式系统 -python 实现 Paxos》,原文作者:Leo Xiao。

一致性算法背景:Paxos

  1. 一致性算法解决的问题:分布式系统中数据不能存在单个节点(主机)上,否则可能呈现单点故障;多个节点(主机)须要保障具备雷同的数据。
  2. 什么是一致性:一致性就是数据保持一致,在分布式系统中,能够了解为多个节点中数据的值是统一的。
  3. 一致性模型分类:个别分为强一致性和弱一致性,强一致性保证系统扭转提交当前立刻扭转集群的状态。常见模型包含:Paxos,Raft(muti-paxos),ZAB(muti-paxos);弱一致性也叫最终一致性,零碎不保障扭转提交当前立刻扭转集群的状态,然而随着工夫的推移最终状态统一的。常见模型包含:DNS 零碎,Gossip 协定
  4. 一致性算法应用案例: Google 的 Chubby 分布式锁服务,采纳了 Paxos 算法;etcd 分布式键值数据库,采纳了 Raft 算法;ZooKeeper 分布式应用协调服务以及 Chubby 的开源实现,采纳 ZAB 算法
  • simple-paxos 就单个动态值达一致性自身并不实用,咱们须要实现的集群零碎(银行账户服务)心愿就随工夫变动的特定状态(账户余额)达成统一。所以须要应用 Paxos 就每个操作达成统一,将每个批改视为状态机转换。
  • Multi-Paxos 实际上是 simple Paxos 实例(插槽)的序列,每个实例都按程序编号。每个状态转换都被赋予一个“插槽编号”,集群的每个成员都以严格的数字程序执行转换。为了更改群集的状态(例如,解决传输操作),咱们尝试在下一个插槽中就该操作达成一致性。具体来说,这意味着向每个音讯增加一个插槽编号,并在每个插槽的根底上跟踪所有协定状态。
  • 为每个插槽运行 Paxos,至多两次往返会太慢。Multi-Paxos 通过对所有插槽应用雷同的选票编号集进行优化,并同时对所有插槽执行 Prepare/Promise。
Client   Proposer      Acceptor     Learner
   |         |          |  |  |       |  | --- First Request ---
   X-------->|          |  |  |       |  |  Request
   |         X--------->|->|->|       |  |  Prepare(N)
   |         |<---------X--X--X       |  |  Promise(N,I,{Va,Vb,Vc})
   |         X--------->|->|->|       |  |  Accept!(N,I,V)
   |         |<---------X--X--X------>|->|  Accepted(N,I,V)
   |<---------------------------------X--X  Response
   |         |          |  |  |       |  |

Paxos 实现

在实用软件中实现 Multi-Paxos 是出了名的艰难,催生了许多论文如 ”Paxos Made Simple”,“Paxos Made Practical”

  • 首先,multi-poposer 在忙碌的环境中可能会成为问题,因为每个群集成员都试图在每个插槽中决定其状态机操作。解决办法是选举一名“leader”,负责为每个时段提交选票。所有其余群集节点将新操作发送到领导者执行。因而,在只有一名领导人的失常运作中,不会产生投票抵触。

Prepare/Promise 阶段能够作为一种 leader 选举:无论哪个集群成员领有最近承诺的选票号码,都被视为 leader。leader 后续能够自在地间接执行 Accept/Accepted 阶段,而不反复第一阶段。咱们将在下文看到的,leader 选举实际上是相当简单的。

尽管 simple Paxos 保障集群不会达成抵触的决定,但它不能保障会做出任何决定。例如,如果初始的 Prepare 音讯失落,并且没有达到接受者,则提议者将期待永远不会达到的 Promise 音讯。解决这个问题须要精心设计的从新传输:足以最终获得停顿,但不会群集产生数据包风暴。

  • 另一个问题是决定的流传。在失常状况下,简略地播送 Decision 信息就能够解决这个问题。然而,如果音讯失落,节点可能会永远不晓得该决定,并且无奈为当前的插槽利用状态机转换。所以实现须要一些机制来共享无关已决定提案的信息。

应用分布式状态机带来了另一个挑战:当新节点启动时,它须要获取群集的现有状态。
尽管能够通过赶上第一个插槽以来的所有插槽的决策来做到这一点,但在一个大的集群中,这可能波及数百万个插槽。此外,咱们须要一些办法来初始化一个新的群集。

集群库介绍

后面都是实践介绍,上面咱们应用 python 来实现一个简化的 Multi-Paxos

业务场景和痛点

咱们以简略的银行账户治理服务的场景作为案例。在这个服务中,每一个账户都有一个以后余额,同时每个账户都有本人的账号。用户能够对账户进行“贷款”、“转账”、“查问以后余额”等操作。“转账”操作同时波及了两个账户:转出账户和转入账户,如果账户余额有余,转账操作必须被驳回。

  • 如果这个服务仅仅在一个服务器上部署,很容易就可能实现:应用一个操作锁来确保“转账”操作不会同时进行,同时对转出账户的进行校验。然而,银行不可能仅仅依赖于一个服务器来贮存账户余额这样的要害信息,通常,这些服务都是被散布在多个服务器上的,每一个服务器各自运行着雷同代码的实例。用户能够通过任何一个服务器来操作账户。
  • 在一个简略的分布式解决零碎的实现中,每个服务器都会保留一份账户余额的正本。它会解决任何收到的操作,并且将账户余额的更新发送给其余的服务器。然而这种办法有一个重大的问题:如果两个服务器同时对一个账户进行操作,哪一个新的账户余额是正确的?即便服务器不共享余额而是共享操作,对一个账户同时进行转账操作也可能造成透支。
  • 从根本上来说,这些谬误的产生都是因为服务器应用它们本地状态来响应操作,而不是首先确保本地状态与其余服务器相匹配。比方,设想服务器 A 接到了从账号 101 向账号 202 转账的操作指令,而此时服务器 B 曾经解决了另一个把账号 101 的钱都转到账号 202 的申请,却没有告诉服务器 A。这样,服务器 A 的本地状态与服务器 B 不一样,即便会造成账户 101 透支,服务器 A 仍然容许从账号 101 进行转账操作。

分布式状态机

为了避免上述情况产生咱们采纳了一种叫做“分布式状态机”的工具。它的思路是对每个同样的输出,每个服务器都运行同样的对应的状态机。因为状态机的个性,对于同样的输出每个服务器的输入都是一样的。对于像“转账”、“查问以后余额”等操作,账号和余额也都是状态机的输出。

这个利用的状态机比较简单:

 def execute_operation(state, operation):
     if operation.name == 'deposit':
         if not verify_signature(operation.deposit_signature):
         return state, False
         state.accounts[operation.destination_account] += operation.amount
         return state, True
     elif operation.name == 'transfer':
         if state.accounts[operation.source_account] < operation.amount:
             return state, False
             state.accounts[operation.source_account] -= operation.amount
         state.accounts[operation.destination_account] += operation.amount
         return state, True
     elif operation.name == 'get-balance':
     return state, state.accounts[operation.account]

值得注意的是,运行“查问以后余额”操作时尽管并不会扭转以后状态,然而咱们仍然把它当做一个状态变动操作来实现。这确保了返回的余额是分布式系统中的最新信息,并且不是基于一个服务器上的本地状态来进行返回的。

这可能跟你在计算机课程中学习到的典型的状态机不太一样。传统的状态机是一系列无限个状态的汇合,每个状态都与一个标记的转移行为绝对应,而在本文中,状态机的状态是账户余额的汇合,因而存在无穷多个可能的状态。然而,状态机的根本规定同样实用于本文的状态机:对于同样的初始状态,同样的输出总是有同样的输入。

因而,分布式状态机确保了对于同样的操作,每个主机都会有同样的相应。然而,为了确保每个服务器都容许状态机的输出,前文中提到的问题仍然存在。这是一个一致性问题,为了解决它咱们采纳了一种派生的 Paxos 算法。

外围需要

  1. 能够为较大的应用程序提供一致性服务: 咱们用一个 Cluster 库来实现简化的 Multi-Paxos
  2. 正确性是这个库最重要的能力,因而结构化代码是很重要的,以便咱们能够看到并测试它与标准的对应关系。
  3. 简单的协定可能会呈现简单的故障,因而咱们将构建对复现和调试不常见的故障的反对。
  4. 咱们会实现 POC 代码:足以证实外围概念是实用的,代码的结构化是为了后续增加此性能对外围实现的更改最小
    咱们开始 coding 吧。

类型和常量

cluster 中的协定须要应用 15 不同的音讯类型,每种音讯类型应用 collection 中的 namedturple 定义:

 Accepted = namedtuple('Accepted', ['slot', 'ballot_num'])
    Accept = namedtuple('Accept', ['slot', 'ballot_num', 'proposal'])
    Decision = namedtuple('Decision', ['slot', 'proposal'])
    Invoked = namedtuple('Invoked', ['client_id', 'output'])
    Invoke = namedtuple('Invoke', ['caller', 'client_id', 'input_value'])
    Join = namedtuple('Join', [])
    Active = namedtuple('Active', [])
    Prepare = namedtuple('Prepare', ['ballot_num'])
    Promise = namedtuple('Promise', ['ballot_num', 'accepted_proposals'])
    Propose = namedtuple('Propose', ['slot', 'proposal'])
    Welcome = namedtuple('Welcome', ['state', 'slot', 'decisions'])
    Decided = namedtuple('Decided', ['slot'])
    Preempted = namedtuple('Preempted', ['slot', 'preempted_by'])
    Adopted = namedtuple('Adopted', ['ballot_num', 'accepted_proposals'])
    Accepting = namedtuple('Accepting', ['leader'])

应用命名元组形容每种音讯类型能够放弃代码的 clean,并有助于防止一些简略的谬误。如果命名元组构造函数没有被赋予正确的属性,则它将引发异样,从而使谬误变得显著。元组在日志音讯中 k 能够很好地格式化,不会像字典那样应用那么多的内存。
创立音讯:

 msg = Accepted(slot=10, ballot_num=30)

拜访音讯:

 got_ballot_num = msg.ballot_num

前面咱们会理解这些音讯的含意。
代码还引入了一些常量,其中大多数常量定义了各种音讯的超时:

 JOIN_RETRANSMIT = 0.7
    CATCHUP_INTERVAL = 0.6
    ACCEPT_RETRANSMIT = 1.0
    PREPARE_RETRANSMIT = 1.0
    INVOKE_RETRANSMIT = 0.5
    LEADER_TIMEOUT = 1.0
    NULL_BALLOT = Ballot(-1, -1)  # sorts before all real ballots
    NOOP_PROPOSAL = Proposal(None, None, None)  # no-op to fill otherwise empty slots

最初咱们须要定义协定中的 Proposal 和 Ballot

 Proposal = namedtuple('Proposal', ['caller', 'client_id', 'input'])
    Ballot = namedtuple('Ballot', ['n', 'leader'])

组件模型

实现 multi-paxos 的外围组件包含 Role 和 Node。

  • 为了保障可测试性并放弃代码的可读性,咱们将 Cluster 合成为与协定中形容的角色绝对应的几个类。每个都是 Role 的子类。
class Role(object):

    def __init__(self, node):
        self.node = node
        self.node.register(self)
        self.running = True
        self.logger = node.logger.getChild(type(self).__name__)

    def set_timer(self, seconds, callback):
        return self.node.network.set_timer(self.node.address, seconds,
                                           lambda: self.running and callback())

    def stop(self):
        self.running = False
        self.node.unregister(self)

群集节点的角色由 Node 类粘在一起,该类代表网络上的单个节点。在程序过程中角色将增加到节点中,并从节点中删除。

达到节点的音讯将中继到所有流动角色,调用以音讯类型命名的办法,前缀为 do_。这些 do_办法接管音讯的属性作为关键字参数,以便于拜访。Node类还提供了send 办法作为不便,应用 functools.partial 为 Network 类的雷同办法提供一些参数。

class Node(object):
    unique_ids = itertools.count()

    def __init__(self, network, address):
        self.network = network
        self.address = address or 'N%d' % self.unique_ids.next()
        self.logger = SimTimeLogger(logging.getLogger(self.address), {'network': self.network})
        self.logger.info('starting')
        self.roles = []
        self.send = functools.partial(self.network.send, self)

    def register(self, roles):
        self.roles.append(roles)

    def unregister(self, roles):
        self.roles.remove(roles)

    def receive(self, sender, message):
        handler_name = 'do_%s' % type(message).__name__

        for comp in self.roles[:]:
            if not hasattr(comp, handler_name):
                continue
            comp.logger.debug("received %s from %s", message, sender)
            fn = getattr(comp, handler_name)
            fn(sender=sender, **message._asdict())

利用接口

每个集群成员上都会创立并启动一个 Member 对象,提供特定于应用程序的状态机和对等项列表。如果成员对象正在退出现有集群,则该成员对象向该节点增加 bootstrap 角色,如果正在创立新集群,则该成员对象增加 seed。再用 Network.run 在独自的线程中运行协定。

应用程序通过该 invoke 办法与集群进行交互,从而启动了状态转换,确定该提议并运行状态机后,invoke 将返回状态机的输入。该办法应用简略的同步 Queue 来期待协定线程的后果。

class Member(object):

    def __init__(self, state_machine, network, peers, seed=None,
                 seed_cls=Seed, bootstrap_cls=Bootstrap):
        self.network = network
        self.node = network.new_node()
        if seed is not None:
            self.startup_role = seed_cls(self.node, initial_state=seed, peers=peers,
                                      execute_fn=state_machine)
        else:
            self.startup_role = bootstrap_cls(self.node,
                                      execute_fn=state_machine, peers=peers)
        self.requester = None

    def start(self):
        self.startup_role.start()
        self.thread = threading.Thread(target=self.network.run)
        self.thread.start()

    def invoke(self, input_value, request_cls=Requester):
        assert self.requester is None
        q = Queue.Queue()
        self.requester = request_cls(self.node, input_value, q.put)
        self.requester.start()
        output = q.get()
        self.requester = None
        return output

Role 类

Paxos 协定中的角色包含:client, acceptor, proposer, learner, and leader。在典型的实现中,单个 processor 能够同时表演一个或多个角色。这不会影响协定的正确性,通常会合并角色以改善协定中的提早和 / 或音讯数量。

上面逐个实现每个角色类

Acceptor

Acceptor 类实现的是 Paxos 中的 acceptor 角色,所以必须存储最近 promise 的选票编号,以及每个时段承受的各个 slot 的 proposal,同时须要相应 Prepare 和 Accept 音讯。这里的 POC 实现是一个和协定能够间接对应的短类,对于 acceptor 来说 Multi-paxos 看起来像是简略的 Paxos,只是在 message 中增加了 slot number。

class Acceptor(Role):

    def __init__(self, node):
        super(Acceptor, self).__init__(node)
        self.ballot_num = NULL_BALLOT
        self.accepted_proposals = {}  # {slot: (ballot_num, proposal)}

    def do_Prepare(self, sender, ballot_num):
        if ballot_num > self.ballot_num:
            self.ballot_num = ballot_num
            # we've heard from a scout, so it might be the next leader
            self.node.send([self.node.address], Accepting(leader=sender))

        self.node.send([sender], Promise(
            ballot_num=self.ballot_num, 
            accepted_proposals=self.accepted_proposals
        ))

    def do_Accept(self, sender, ballot_num, slot, proposal):
        if ballot_num >= self.ballot_num:
            self.ballot_num = ballot_num
            acc = self.accepted_proposals
            if slot not in acc or acc[slot][0] < ballot_num:
                acc[slot] = (ballot_num, proposal)

        self.node.send([sender], Accepted(slot=slot, ballot_num=self.ballot_num))

Replica

Replica 类是 Role 类最简单的子类,对应协定中的 Learner 和 Proposal 角色,它的主要职责是:提出新的 proposal;在决定 proposal 时调用本地状态机;跟踪以后 Leader;以及将新启动的节点增加到集群中。

Replica 创立新的 proposal 以响应来自客户端的“invoke”音讯,抉择它认为是未应用的插槽,并向以后 leader 发送“Propose”音讯。如果选定插槽的共识是针对不同 proposal,则 replica 必须应用新插槽 re-propose。

下图显示 Replica 的角色管制流程:

Requester    Local Rep   Current Leader
   X---------->|             |    Invoke
   |           X------------>|    Propose
   |           |<------------X    Decision
   |<----------X             |    Decision
   |           |             | 

Decision 音讯示意集群已达成共识的插槽,Replica 类存储新的决定并运行状态机,直到达到未确定的插槽。Replica 从本地状态机已解决的提交的 slot 辨认出集群已批准的已决定的 slot。如果 slot 呈现乱序,提交的提案可能会滞后,期待下一个空位被决定。提交 slot 后,每个 replica 会将操作后果发送回一条 Invoked 音讯给请求者。

在某些状况下 slot 可能没有无效的提案,也没有决策, 须要状态机一个接一个地执行 slot,因而群集必须就填充 slot 的内容达成共识。为了防止这种可能性,Replica 在遇到插槽时会提出“no-op”的 proposal。如果最终决定了这样的 proposal,则状态机对该 slot 不执行任何操作。

同样,同一 proposal 有可能被 Decision 两次。对于任何此类反复的 proposal,Replica 将跳过调用状态机,而不会对该 slot 执行任何状态转换。

Replicas 须要晓得哪个节点是 active leader 能力向其发送 Propose 音讯,要实现这一指标,每个正本都应用三个信息源跟踪 active leader。

当 leader 的角色转换为 active 时,它会向同一节点上的正本发送一条 Adopted 音讯(下图):

Leader    Local Repplica   
   X----------->|          Admopted

当 acceptor 角色向 Promise 新的 leader 发送 Accepting 音讯时,它将音讯发送到其本地正本(下图)。

Acceptor    Local Repplica   
   X----------->|          Accepting

active leader 将以心跳的模式发送 Active 音讯。如果在 LEADER_TIMEOUT 到期之前没有此类音讯达到,则 Replica 将假设该 Leader 已死,并转向下一个 Leader。在这种状况下,重要的是所有正本都抉择雷同的新领导者,咱们能够通过对成员进行排序并在列表中抉择下一个 leader。

当节点退出网络时,Bootstrap 将发送一条 Join 音讯(下图)。Replica 以一条 Welcome 蕴含其最新状态的音讯作为响应,从而使新节点可能疾速启用。

BootStrap     Replica        Replica       Replica
     X---------->|             |             |    Join
     |<----------X             X             |    Welcome
     X------------------------>|             |    Join
     |<------------------------X             |    Welcome
     X-------------------------------------->|    Join
     |<--------------------------------------X    Welcome      
class Replica(Role):

    def __init__(self, node, execute_fn, state, slot, decisions, peers):
        super(Replica, self).__init__(node)
        self.execute_fn = execute_fn
        self.state = state
        self.slot = slot
        self.decisions = decisions
        self.peers = peers
        self.proposals = {}
        # next slot num for a proposal (may lead slot)
        self.next_slot = slot
        self.latest_leader = None
        self.latest_leader_timeout = None

    # making proposals

    def do_Invoke(self, sender, caller, client_id, input_value):
        proposal = Proposal(caller, client_id, input_value)
        slot = next((s for s, p in self.proposals.iteritems() if p == proposal), None)
        # propose, or re-propose if this proposal already has a slot
        self.propose(proposal, slot)

    def propose(self, proposal, slot=None):
        """Send (or resend, if slot is specified) a proposal to the leader"""
        if not slot:
            slot, self.next_slot = self.next_slot, self.next_slot + 1
        self.proposals[slot] = proposal
        # find a leader we think is working - either the latest we know of, or
        # ourselves (which may trigger a scout to make us the leader)
        leader = self.latest_leader or self.node.address
        self.logger.info("proposing %s at slot %d to leader %s" % (proposal, slot, leader))
        self.node.send([leader], Propose(slot=slot, proposal=proposal))

    # handling decided proposals

    def do_Decision(self, sender, slot, proposal):
        assert not self.decisions.get(self.slot, None), 
                "next slot to commit is already decided"
        if slot in self.decisions:
            assert self.decisions[slot] == proposal, 
                "slot %d already decided with %r!" % (slot, self.decisions[slot])
            return
        self.decisions[slot] = proposal
        self.next_slot = max(self.next_slot, slot + 1)

        # re-propose our proposal in a new slot if it lost its slot and wasn't a no-op
        our_proposal = self.proposals.get(slot)
        if (our_proposal is not None and 
            our_proposal != proposal and our_proposal.caller):
            self.propose(our_proposal)

        # execute any pending, decided proposals
        while True:
            commit_proposal = self.decisions.get(self.slot)
            if not commit_proposal:
                break  # not decided yet
            commit_slot, self.slot = self.slot, self.slot + 1

            self.commit(commit_slot, commit_proposal)

    def commit(self, slot, proposal):
        """Actually commit a proposal that is decided and in sequence"""
        decided_proposals = [p for s, p in self.decisions.iteritems() if s < slot]
        if proposal in decided_proposals:
            self.logger.info("not committing duplicate proposal %r, slot %d", proposal, slot)
            return  # duplicate

        self.logger.info("committing %r at slot %d" % (proposal, slot))
        if proposal.caller is not None:
            # perform a client operation
            self.state, output = self.execute_fn(self.state, proposal.input)
            self.node.send([proposal.caller], 
                Invoked(client_id=proposal.client_id, output=output))

    # tracking the leader

    def do_Adopted(self, sender, ballot_num, accepted_proposals):
        self.latest_leader = self.node.address
        self.leader_alive()

    def do_Accepting(self, sender, leader):
        self.latest_leader = leader
        self.leader_alive()

    def do_Active(self, sender):
        if sender != self.latest_leader:
            return
        self.leader_alive()

    def leader_alive(self):
        if self.latest_leader_timeout:
            self.latest_leader_timeout.cancel()

        def reset_leader():
            idx = self.peers.index(self.latest_leader)
            self.latest_leader = self.peers[(idx + 1) % len(self.peers)]
            self.logger.debug("leader timed out; tring the next one, %s", 
                self.latest_leader)
        self.latest_leader_timeout = self.set_timer(LEADER_TIMEOUT, reset_leader)

    # adding new cluster members

    def do_Join(self, sender):
        if sender in self.peers:
            self.node.send([sender], Welcome(state=self.state, slot=self.slot, decisions=self.decisions))

Leader Scout Commander

Leader 的次要工作是承受 Propose 要求新投票的音讯并做出决定。胜利实现协定的 Prepare/Promise 局部后 Leader 将处于“Active 状态”。沉闷的 Leader 能够立刻发送 Accept 音讯以响应 Propose。

与按角色分类的模型保持一致,Leader 会委派 scout 和 Commander 角色来执行协定的每个局部。

class Leader(Role):

    def __init__(self, node, peers, commander_cls=Commander, scout_cls=Scout):
        super(Leader, self).__init__(node)
        self.ballot_num = Ballot(0, node.address)
        self.active = False
        self.proposals = {}
        self.commander_cls = commander_cls
        self.scout_cls = scout_cls
        self.scouting = False
        self.peers = peers

    def start(self):
        # reminder others we're active before LEADER_TIMEOUT expires
        def active():
            if self.active:
                self.node.send(self.peers, Active())
            self.set_timer(LEADER_TIMEOUT / 2.0, active)
        active()

    def spawn_scout(self):
        assert not self.scouting
        self.scouting = True
        self.scout_cls(self.node, self.ballot_num, self.peers).start()

    def do_Adopted(self, sender, ballot_num, accepted_proposals):
        self.scouting = False
        self.proposals.update(accepted_proposals)
        # note that we don't re-spawn commanders here; if there are undecided
        # proposals, the replicas will re-propose
        self.logger.info("leader becoming active")
        self.active = True

    def spawn_commander(self, ballot_num, slot):
        proposal = self.proposals[slot]
        self.commander_cls(self.node, ballot_num, slot, proposal, self.peers).start()

    def do_Preempted(self, sender, slot, preempted_by):
        if not slot:  # from the scout
            self.scouting = False
        self.logger.info("leader preempted by %s", preempted_by.leader)
        self.active = False
        self.ballot_num = Ballot((preempted_by or self.ballot_num).n + 1, 
                                 self.ballot_num.leader)

    def do_Propose(self, sender, slot, proposal):
        if slot not in self.proposals:
            if self.active:
                self.proposals[slot] = proposal
                self.logger.info("spawning commander for slot %d" % (slot,))
                self.spawn_commander(self.ballot_num, slot)
            else:
                if not self.scouting:
                    self.logger.info("got PROPOSE when not active - scouting")
                    self.spawn_scout()
                else:
                    self.logger.info("got PROPOSE while scouting; ignored")
        else:
            self.logger.info("got PROPOSE for a slot already being proposed")

Leader 想要变为活动状态时会创立一个 Scout 角色,以响应 Propose 在其处于非活动状态时收到音讯(下图),Scout 发送(并在必要时从新发送)Prepare 音讯,并收集 Promise 响应,直到听到音讯为止。少数同行或直到被抢占为止。在通过 Adopted 或 Preempted 回复给 Leader。

Leader    Scout      Acceptor     Acceptor    Acceptor
   |          |          |            |           |   
   |          X--------->|            |           |    Prepare
   |          |<---------X            |           |    Promise
   |          X---------------------->|           |    Prepare
   |          |<----------------------X           |    Promise
   |          X---------------------------------->|    Prepare
   |          |<----------------------------------X    Promise
   |<---------X          |            |           |    Adopted
class Scout(Role):
 def __init__(self, node, ballot_num, peers):
        super(Scout, self).__init__(node)
        self.ballot_num = ballot_num
        self.accepted_proposals = {}
        self.acceptors = set([])
        self.peers = peers
        self.quorum = len(peers) / 2 + 1
        self.retransmit_timer = None

    def start(self):
        self.logger.info("scout starting")
        self.send_prepare()

    def send_prepare(self):
        self.node.send(self.peers, Prepare(ballot_num=self.ballot_num))
        self.retransmit_timer = self.set_timer(PREPARE_RETRANSMIT, self.send_prepare)

    def update_accepted(self, accepted_proposals):
        acc = self.accepted_proposals
        for slot, (ballot_num, proposal) in accepted_proposals.iteritems():
            if slot not in acc or acc[slot][0] < ballot_num:
                acc[slot] = (ballot_num, proposal)

    def do_Promise(self, sender, ballot_num, accepted_proposals):
        if ballot_num == self.ballot_num:
            self.logger.info("got matching promise; need %d" % self.quorum)
            self.update_accepted(accepted_proposals)
            self.acceptors.add(sender)
            if len(self.acceptors) >= self.quorum:
                # strip the ballot numbers from self.accepted_proposals, now that it
                # represents a majority
                accepted_proposals =  
                    dict((s, p) for s, (b, p) in self.accepted_proposals.iteritems())
                # We're adopted; note that this does *not* mean that no other
                # leader is active.  # Any such conflicts will be handled by the
                # commanders.
                self.node.send([self.node.address],
                    Adopted(ballot_num=ballot_num, 
                            accepted_proposals=accepted_proposals))
                self.stop()
        else:
            # this acceptor has promised another leader a higher ballot number,
            # so we've lost
            self.node.send([self.node.address], 
                Preempted(slot=None, preempted_by=ballot_num))
            self.stop()

Leader 为每个有 active proposal 的 slot 创立一个 Commander 角色(下图)。像 Scout 一样,Commander 发送和从新发送 Accept 音讯,并期待大多数接受者的回复 Accepted 或抢占音讯。承受倡议后,Commander 将 Decision 音讯播送到所有节点。它用 Decided 或 Preempted 响应 Leader。

Leader    Commander   Acceptor     Acceptor    Acceptor
   |          |          |            |           |   
   |          X--------->|            |           |    Accept
   |          |<---------X            |           |    Accepted
   |          X---------------------->|           |    Accept
   |          |<----------------------X           |    Accepted
   |          X---------------------------------->|    Accept
   |          |<----------------------------------X    Accepted
   |<---------X          |            |           |    Decided
class Commander(Role):
 def __init__(self, node, ballot_num, slot, proposal, peers):
        super(Commander, self).__init__(node)
        self.ballot_num = ballot_num
        self.slot = slot
        self.proposal = proposal
        self.acceptors = set([])
        self.peers = peers
        self.quorum = len(peers) / 2 + 1

    def start(self):
        self.node.send(set(self.peers) - self.acceptors, Accept(slot=self.slot, ballot_num=self.ballot_num, proposal=self.proposal))
        self.set_timer(ACCEPT_RETRANSMIT, self.start)

    def finished(self, ballot_num, preempted):
        if preempted:
            self.node.send([self.node.address], 
                           Preempted(slot=self.slot, preempted_by=ballot_num))
        else:
            self.node.send([self.node.address], 
                           Decided(slot=self.slot))
        self.stop()

    def do_Accepted(self, sender, slot, ballot_num):
        if slot != self.slot:
            return
        if ballot_num == self.ballot_num:
            self.acceptors.add(sender)
            if len(self.acceptors) < self.quorum:
                return
            self.node.send(self.peers, Decision(slot=self.slot, proposal=self.proposal))
            self.finished(ballot_num, False)
        else:
            self.finished(ballot_num, True)

有一个问题是后续会介绍的网络模拟器甚至在节点内的音讯上也引入了数据包失落。当所有 Decision 音讯失落时,该协定无奈持续进行。Replica 持续从新传输 Propose 音讯,然而 Leader 疏忽了这些音讯,因为它曾经对该 slot 提出了 proposal,因为没有 Replica 收到 Decision 所以 Replica 的 catch 过程找不到后果,解决方案是像理论网络堆栈以东洋确保本地音讯始终传递胜利。

Bootstrap

node 退出 cluster 时必须获取以后的 cluster 状态,Bootstrap role 循环每个节点发送 join 音讯,晓得收到 Welcome, Bootstrap 的时序图如下所示:

如果在每个 role(replica,leader,acceptor)中实现启动过程,并期待 welcome 音讯,会把初始化逻辑扩散到每个 role,测试起来会十分麻烦,最终,咱们决定增加 bootstrap role,一旦启动实现,就给 node 增加每个 role,并且将初始状态传递给他们的构造函数。

class Bootstrap(Role):

    def __init__(self, node, peers, execute_fn,
                 replica_cls=Replica, acceptor_cls=Acceptor, leader_cls=Leader,
                 commander_cls=Commander, scout_cls=Scout):
        super(Bootstrap, self).__init__(node)
        self.execute_fn = execute_fn
        self.peers = peers
        self.peers_cycle = itertools.cycle(peers)
        self.replica_cls = replica_cls
        self.acceptor_cls = acceptor_cls
        self.leader_cls = leader_cls
        self.commander_cls = commander_cls
        self.scout_cls = scout_cls

    def start(self):
        self.join()

    def join(self):
        self.node.send([next(self.peers_cycle)], Join())
        self.set_timer(JOIN_RETRANSMIT, self.join)

    def do_Welcome(self, sender, state, slot, decisions):
        self.acceptor_cls(self.node)
        self.replica_cls(self.node, execute_fn=self.execute_fn, peers=self.peers,
                         state=state, slot=slot, decisions=decisions)
        self.leader_cls(self.node, peers=self.peers, commander_cls=self.commander_cls,
                        scout_cls=self.scout_cls).start()
        self.stop()

参考:

  • http://aosabook.org/en/500L/c…
  • https://lamport.azurewebsites…
  • https://lamport.azurewebsites…
  • https://www.scs.stanford.edu/…
  • https://www.researchgate.net/…
  • https://www.paxos.com/
  • https://www.cs.cornell.edu/co…
  • https://en.wikipedia.org/wiki…
  • https://ongardie.net/static/r…
  • https://zhuanlan.zhihu.com/p/…

点击关注,第一工夫理解华为云陈腐技术~

正文完
 0