关于程序员:分布式事务的21种武器-5

2次阅读

共计 9050 个字符,预计需要花费 23 分钟才能阅读完成。

在分布式系统中,事务的解决散布在不同组件、服务中,因而分布式事务的 ACID 保障面临着一些非凡难点。本系列文章介绍了 21 种分布式事务设计模式,并剖析其实现原理和优缺点,在面对具体分布式事务问题时,能够抉择适合的模式进行解决。原文: Exploring Solutions for Distributed Transactions (5)

在不同业务场景下,能够有不同的解决方案,常见办法有:

  1. 阻塞重试(Blocking Retry)
  2. 二阶段和三阶段提交(Two-Phase Commit (2PC) and Three-Phase Commit (3PC))
  3. 基于后盾队列的异步解决(Using Queues to Process Asynchronously in the Background)
  4. TCC 弥补(TCC Compensation Matters)
  5. 本地音讯表(异步保障)/ 发件箱模式(Local Message Table (Asynchronously Ensured)/Outbox Pattern)
  6. MQ 事务(MQ Transaction)
  7. Saga 模式(Saga Pattern)
  8. 事件驱动(Event Sourcing)
  9. 命令查问职责拆散(Command Query Responsibility Segregation, CQRS)
  10. 原子提交(Atomic Commitment)
  11. 并行提交(Parallel Commits)
  12. 事务复制(Transactional Replication)
  13. 一致性算法(Consensus Algorithms)
  14. 工夫戳排序(Timestamp Ordering)
  15. 乐观并发管制(Optimistic Concurrency Control)
  16. 拜占庭容错(Byzantine Fault Tolerance, BFT)
  17. 分布式锁(Distributed Locking)
  18. 分片(Sharding)
  19. 多版本并发管制(Multi-Version Concurrency Control, MVCC)
  20. 分布式快照(Distributed Snapshots)
  21. 主从复制(Leader-Follower Replication)

本文将介绍一致性算法、工夫戳排序以及乐观并发管制三种模式。

13. 一致性算法(Consensus Algorithms)

  • 零碎中的所有节点都批准最终后果或决策。
  • 波及如下步骤:

    1. 提议(Proposal) —— 一个节点向零碎中的其余节点提出一个值
    2. 播送(Broadcast) —— 将倡议的值播送到零碎中所有节点
    3. 确认(Acknowledgment) —— 每个节点确认提议并向提议者返回确认音讯
    4. 决定(Decision) —— 一旦提议者收到大多数节点的确认,就能够对提议的值做出决定
    5. 承诺(Commitment) —— 零碎中所有节点提交决定
  • Paxos 算法还包含以下几个步骤:

    1. 筹备阶段(Prepare phase) —— 提议者选定提案,并向零碎中的大多数节点发送筹备申请
    2. 承诺阶段(Promise phase) —— 如果某个节点收到的筹备音讯序列号高于之前看到的序列号,将以承诺响应,并不再承受任何低于该序列号的提案
    3. 承受阶段(Accept phase) —— 如果提议者收到大多数节点的承诺,j 就向大多数节点发送带有提案序列号的 ” 承受 ” 申请
    4. 已承受阶段(Accepted phase) —— 如果某个节点收到的承受音讯序列号高于以前见过的任何承受申请序列号,就承受该提案,并向提议者和所有其余节点发送承受音讯
    5. 学习阶段(Learn phase) —— 一旦提议者从大多数节点接管到被承受的音讯,就能够提交提议。
from typing import List, Tuple

class PaxosNode:
    def __init__(self, node_id: int, nodes: List[int]):
        self.node_id = node_id
        self.nodes = nodes
        self.state = "proposed"
        self.proposed_value = None
        self.accepted_value = None
        self.accepted_round = -1
    
    def run_paxos(self, value: int) -> int:
        while True:
            if self.state == "proposed":
                self.proposed_value = value
                self.state = "prepare"
            
            if self.state == "prepare":
                max_round, max_val = self.prepare()
                if max_val is None:
                    self.state = "accept"
                else:
                    self.state = "proposed"
            
            if self.state == "accept":
                self.accepted_value = self.proposed_value
                self.accepted_round = max_round
                self.send_accept()
                self.state = "decided"
            
            if self.state == "decided":
                return self.accepted_value
    
    def prepare(self) -> Tuple[int, int]:
        max_round = -1
        max_val = None
        for node in self.nodes:
            round, val = node.receive_prepare()
            if round > max_round:
                max_round = round
                max_val = val
        
        return max_round, max_val
    
    def send_prepare(self, round: int):
        for node in self.nodes:
            node.receive_prepare_request(round, self.node_id)
    
    def receive_prepare_request(self, round: int, sender_id: int):
        if round > self.accepted_round:
            self.accepted_round = round
            self.send_prepare(round)
    
    def receive_prepare(self) -> Tuple[int, int]:
        return self.accepted_round, self.accepted_value
    
    def send_accept(self):
        for node in self.nodes:
            node.receive_accept_request(self.accepted_round, self.accepted_value)
    
    def receive_accept_request(self, round: int, value: int):
        if round >= self.accepted_round:
            self.accepted_round = round
            self.accepted_value = value
            self.send_accepted()
    
    def send_accepted(self):
        for node in self.nodes:
            node.receive_accepted(self.accepted_round, self.accepted_value)
    
    def receive_accepted(self, round: int, value: int):
        if round == self.accepted_round:
            self.proposed_value = value

示例代码

  • 示例代码为 Paxos 算法的一种实现,即便存在网络故障或其余可能产生的故障,也能在一组节点之间就某个值达成共识。
  • PaxosNode类示意分布式系统中参加 Paxos 算法的节点。
  • 构造函数承受 2 个参数,node_id是节点的 ID,nodes是零碎中所有节点的 ID 列表。
  • run_paxos办法运行 Paxos 算法,承受一个值作为输出,并返回零碎中所有节点都批准的值。该办法将有限循环执行,直到约定一个值为止。
  • prepare办法向零碎中所有节点发送 ”prepare” 音讯,并期待响应。该办法返回零碎中其余节点能够承受的最大整数值,如果不承受任何值,则返回None
  • send_prepare办法,当节点想要向零碎中的其余节点发送 ”prepare” 音讯时,调用该办法。
  • receive_prepare_request办法,当一个节点从另一个节点接管到 ”prepare” 音讯时,调用该办法。如果 ”prepare” 音讯携带的整数大于该节点承受的整数,则该节点更新其承受的整数,并向零碎中其余节点发送 ”prepare” 音讯。
  • receive_prepare办法,当节点接管到对 ”prepare” 音讯的响应时,调用该办法,返回可承受的整数值。
  • send_accept办法,当节点想要向零碎中的其余节点发送 ”accept” 音讯时,调用该办法。
  • receive_accept_request办法,当节点接管到来自另一个节点的 ”accept” 音讯时,调用该办法。如果 ”accept” 音讯携带的整数大于或等于该节点已承受的整数,则该节点更新其承受的整数值,并向零碎中其余节点发送 ”accepted” 音讯。
  • send_accepted办法,当节点想要向零碎中其余节点发送 ”accepted”音讯时,调用该办法。
  • receive_accepted办法,当节点从另一个节点接管到 ”accepted” 音讯时,调用该办法。如果 ”accepted” 音讯的整数与节点已承受的整数雷同,则节点应用 ”accepted” 音讯中的值更新其倡议值。

长处

  • 所有节点都批准零碎的状态
  • 能够容忍某些节点的故障,即便某些节点产生故障,零碎也能够持续运行

毛病

  • 算法比较复杂,难以实现
  • 算法执行较慢,可能导致系统提早减少
  • 算法须要在节点之间通信,减少了网络带宽和解决能力方面的开销

实用场景

  • 确保交易统一和精确的金融零碎
  • 确保所有节点具备雷同供应链视图的供应链管理系统

14. 工夫戳排序(Timestamp Ordering)

  • 一种用于在分布式系统中对事务排序的共识算法。
  • 每个事务被调配一个工夫戳,并且事务依照其工夫戳的程序执行。
  • 波及如下步骤:

    1. 每个节点为接管到的事务生成惟一的工夫戳,能够用全局时钟生成工夫戳,也能够应用带有某种同步机制的本地时钟生成工夫戳。
    2. 工夫戳由 (T, N) 组成,其中 T 为工夫戳值,N 为生成工夫戳的节点标识符。
    3. 当节点接管到新事务时,会依据之前接管到的所有事务的工夫戳查看该事务的工夫戳。如果新事务的工夫戳比之前接管到的事务的工夫戳都早,则立刻执行。如果新事务的工夫戳比以前接管到的事务的工夫戳都要新,那么将被提早,直到所有旧事务都执行完为止。
    4. 如果两个事务具备雷同的工夫戳,则应用 tie-breaking 机制来解决抵触。一种可能的 tie-breaking 机制是应用节点标识符作为判断规范,首先执行具备较低节点标识符的事务。
    5. 一旦事务被执行,后果就会流传到所有其余节点。
    6. 如果某个节点产生故障或断开网络连接,则一旦它重新加入网络,它的事务能够由另一个节点从新执行。事务能够依照工夫戳程序执行,以确保零碎状态保持一致。
from typing import List, Tuple

class Timestamp:
    def __init__(self, node_id: int):
        self.node_id = node_id
        self.counter = 0

    def increment(self):
        self.counter += 1

    def __str__(self):
        return f"{self.node_id}:{self.counter}"

class Event:
    def __init__(self, node_id: int, timestamp: Timestamp, data: str):
        self.node_id = node_id
        self.timestamp = timestamp
        self.data = data

    def __str__(self):
        return f"{self.node_id} {self.timestamp} {self.data}"

class Network:
    def __init__(self, nodes: List[int]):
        self.nodes = nodes
        self.message_queues = {node: [] for node in nodes}

    def send(self, sender: int, receiver: int, message: str):
        self.message_queues[receiver].append((sender, message))

    def receive(self, node_id: int) -> Tuple[int, str]:
        if len(self.message_queues[node_id]) > 0:
            return self.message_queues[node_id].pop(0)
        else:
            return None

class Node:
    def __init__(self, node_id: int, network: Network, initial_data: List[str]):
        self.node_id = node_id
        self.network = network
        self.clock = Timestamp(node_id)
        self.queue = []
        for data in initial_data:
            self.queue.append(Event(node_id, self.clock, data))
            self.clock.increment()

    def run(self):
        while True:
            event = self.queue.pop(0)
            print(f"Node {self.node_id} executing event {event}")
            self.clock = max(self.clock, event.timestamp)  # Update local clock

示例代码

  • 实现分布式系统的工夫戳排序算法
  • 定义了几个类和办法来实现工夫戳排序算法的模仿
  • Timestamp类示意工夫戳,由 node_idcounter组成
  • Event类示意分布式系统中的事件。事件由 node_id(生成事件的节点 ID)、timestamp(事件的工夫戳) 和data(事件的无效负载)组成。
  • Network类示意连贯分布式系统中节点的网络
  • Node类代表节点,由 node_id(节点 ID)、network(所连贯的网络)、clock(本地工夫戳) 和queue(节点曾经生成并期待执行的事件列表)组成。
  • increment()办法减少工夫戳的计数器值
  • __str__()办法以 ”node_id:counter“ 的格局返回工夫戳的字符串示意模式。
  • __str__()办法以 ”node_id:timestamp data“ 的格局返回事件的字符串示意模式。
  • send()办法 将音讯从一个节点发送到另一个节点
  • receive()办法接管给定节点队列中的下一条音讯,如果队列中没有音讯,则返回None
  • run()办法是节点的主循环,实现为有限循环,按工夫戳程序执行队列中的事件。当某个事件被执行时,节点将其本地时钟更新为其以后工夫戳和被执行事件工夫戳的最大值。
  • nodes属性是零碎中节点的 id 列表
  • message_queues属性将每个节点 ID 映射到字典

长处

  • 所有事件都是有序的,并且在零碎中所有节点上以统一的程序产生
  • 解决并发事件之间的抵触

毛病

  • 过程简单且资源密集
  • 时钟同步、一致性等问题

实用场景

  • 金融零碎: 交易依照正确程序进行解决,而不论来自哪个节点
  • 供应链管理系统: 跟踪货物流动,确保所有事件都依照正确的程序解决

15. 乐观并发管制(Optimistic Concurrency Control)
  • 一种在数据库管理系统 (DBMS) 中用于解决并发拜访数据的技术
  • 次要没有抵触,能够容许多个事务并发批改雷同的数据项
  • 波及如下步骤:

    • 开始事务(T1) —— 当事务开始时,读取须要的数据库记录,并将读取的版本记录在其公有工作区中。
    • 批改数据 —— 当事务批改数据时,将批改记录在其公有工作区中,而不更新理论的数据库记录。
    • 事务完结(T1) —— 当事务筹备提交时,查看是否有其余事务在读取数据后批改了雷同的数据。另外,将其公有工作区中的数据版本与数据库中数据的以后版本进行比拟。
    • 验证查看 —— 如果数据库中数据的以后版本与 T1 读取的版本雷同,则 T1 能够将其更改提交到数据库。然而,如果数据的以后版本与 T1 读取的版本不同,则意味着 T1 的更改与另一个事务的更改产生了抵触。
    • 回滚 —— 如果 T1 的更改与另一个事务的更改产生了抵触,那么 T1 必须停止并回滚其更改。T1 能够在提早后重试事务或采取其余适当操作。
    • 提交 —— 如果 T1 的更改不与任何其余事务的更改抵触,那么 T1 能够将其更改提交到数据库,用其公有工作区中所做的批改来更新数据库记录。
from typing import List

class Account:
    def __init__(self, id: int, balance: float):
        self.id = id
        self.balance = balance
        self.version = 0

    def withdraw(self, amount: float):
        self.balance -= amount
        self.version += 1

    def deposit(self, amount: float):
        self.balance += amount
        self.version += 1

class OptimisticConcurrencyControl:
    def __init__(self, accounts: List[Account]):
        self.accounts = accounts

    def transfer(self, sender_id: int, receiver_id: int, amount: float):
        # Find sender and receiver accounts
        sender = next(acc for acc in self.accounts if acc.id == sender_id)
        receiver = next(acc for acc in self.accounts if acc.id == receiver_id)

        # Create copies of the accounts to modify
        sender_copy = Account(sender.id, sender.balance)
        receiver_copy = Account(receiver.id, receiver.balance)

        # Withdraw from sender and deposit to receiver
        sender_copy.withdraw(amount)
        receiver_copy.deposit(amount)

        # Update the global accounts list if there are no conflicts
        for i, acc in enumerate(self.accounts):
            if acc.id == sender_id:
                if acc.version != sender.version:
                    raise Exception("Optimistic Concurrency Control failed")
                self.accounts[i] = sender_copy
            elif acc.id == receiver_id:
                if acc.version != receiver.version:
                    raise Exception("Optimistic Concurrency Control failed")
                self.accounts[i] = receiver_copy

示例代码

  • Account类示意蕴含 ID、余额和版本号的银行帐户,版本号用于跟踪帐户更新的次数。
  • OptimisticConcurrencyControl类将帐户列表作为输出。
  • transfer办法的输出为发送方和接管方账户 id 以及要转账的金额。
  • withdrawdeposit 办法,批改帐户余额并减少版本号。
  • 将全局帐号列表中帐号的版本号与发送方、接管方帐号的版本号进行比拟,查看更新帐号是否存在抵触,如果存在抵触就会引发异样。如果没有抵触,用批改后的帐户更新全局帐户列表。在发生冲突的状况下,能够重试事务,或者告诉用户手动解决抵触。

长处

  • 不须要锁或简单的数据结构
  • 事务能够同时批改雷同的数据,因而能够反对高并发性
  • 事务不须要期待锁被开释

毛病

  • 如果多个事务频繁批改雷同的数据项,将造成大量批改失败并回滚
  • 须要额定的解决开销来比拟数据版本并查看抵触
  • 不提供任何对于事务提交程序的保障

实用场景

  • 电子商务利用 —— 大量用户同时拜访同一个数据库
  • 银行和金融利用 —— 大量交易同时产生

挑战

  • 不正确的实现将导致数据不统一
  • 须要解决抵触和回滚的机制,会减少应用程序的复杂性

参考文献

What is a consensus algorithm?

Consensus Algorithms in Blockchain

How Many Consensus Algorithms Are There? An Overview

Analysis of the Blockchain Consensus Algorithms

Consensus Algorithms Distributed Systems

Multiversion Timestamp Ordering

DBMS Timestamp Ordering Protocol

Timestamp-based Concurrency Control

Timestamp Ordering Protocol in DBMS

Timestamp-based Ordering Protocol in DBMS

What is an optimistic concurrency control in DBMS

Optimistic vs Pessimistic Concurrency: What Every Developer Should Know

Dealing with Optimistic Concurrency Control Collisions


你好,我是俞凡,在 Motorola 做过研发,当初在 Mavenir 做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI 等技术始终保持着浓重的趣味,平时喜爱浏览、思考,置信继续学习、一生成长,欢送一起交流学习。\
微信公众号:DeepNoMind

本文由 mdnice 多平台公布

正文完
 0