关于程序员:分布式事务的21种武器-6

3次阅读

共计 8061 个字符,预计需要花费 21 分钟才能阅读完成。

在分布式系统中,事务的解决散布在不同组件、服务中,因而分布式事务的 ACID 保障面临着一些非凡难点。本系列文章介绍了 21 种分布式事务设计模式,并剖析其实现原理和优缺点,在面对具体分布式事务问题时,能够抉择适合的模式进行解决。原文: Exploring Solutions for Distributed Transactions (6)

在不同业务场景下,能够有不同的解决方案,常见办法有:

  1. 阻塞重试(Blocking Retry)
  2. 二阶段和三阶段提交(Two-Phase Commit (2PC) and Three-Phase Commit (3PC))
  3. 基于后盾队列的异步解决(Using Queues to Process Asynchronously in the Background)
  4. TCC 弥补(TCC Compensation Matters)
  5. 本地音讯表(异步保障)/ 发件箱模式(Local Message Table (Asynchronously Ensured)/Outbox Pattern)
  6. MQ 事务(MQ Transaction)
  7. Saga 模式(Saga Pattern)
  8. 事件驱动(Event Sourcing)
  9. 命令查问职责拆散(Command Query Responsibility Segregation, CQRS)
  10. 原子提交(Atomic Commitment)
  11. 并行提交(Parallel Commits)
  12. 事务复制(Transactional Replication)
  13. 一致性算法(Consensus Algorithms)
  14. 工夫戳排序(Timestamp Ordering)
  15. 乐观并发管制(Optimistic Concurrency Control)
  16. 拜占庭容错(Byzantine Fault Tolerance, BFT)
  17. 分布式锁(Distributed Locking)
  18. 分片(Sharding)
  19. 多版本并发管制(Multi-Version Concurrency Control, MVCC)
  20. 分布式快照(Distributed Snapshots)
  21. 主从复制(Leader-Follower Replication)

本文将介绍拜占庭容错、分布式锁以及分片三种模式。

16. 拜占庭容错(Byzantine Fault Tolerance, BFT)
  • 一种容错机制,容许分布式系统在存在故障节点的状况下失常运行。
  • 节点之间互相通信从而就决策达成共识,即便存在某些歹意或谬误节点,也能够工作。
  • 波及如下步骤:

    1. 零碎中每个节点向所有其余节点发送音讯,其中蕴含倡议的决策或值的信息。
    2. 每个节点查看接管到的音讯的有效性。如果接管到来自故障节点或攻击者的音讯,则疏忽该音讯。
    3. 每个节点收集接管到的所有无效音讯,并创立零碎视图(view)。视图是一组特定节点的无效音讯,每个节点与零碎中的所有其余节点共享视图。
    4. 每个节点查看从其余节点接管到的视图的有效性。如果接管到有效视图,则疏忽该视图。
    5. 每个节点创立蕴含其零碎视图的证书(certificate),并与所有其余节点共享。证书是通过签名的申明,用于证实视图的有效性。
    6. 每个节点查看从其余节点收到的证书的有效性。如果收到有效证书,应该疏忽该证书。
    7. 一旦节点验证了收到的所有证书,就能够对提议的决策或值达成共识。每个节点依据收到的证书对雷同的值达成共识。
    8. 一旦达成共识,每个节点执行已达成统一的决策。
from typing import List, Dict, Tuple

class Message:
    def __init__(self, sender: int, content: str):
        self.sender = sender
        self.content = content

class ByzantineNode:
    def __init__(self, id: int, network: Dict[int, List[Message]], threshold: int):
        self.id = id
        self.network = network
        self.threshold = threshold
        self.decisions = {}

    def send_message(self, receiver: int, content: str):
        message = Message(self.id, content)
        self.network[receiver].append(message)

    def receive_messages(self) -> List[Message]:
        messages = self.network[self.id]
        self.network[self.id] = []
        return messages

    def generate_vote(self, messages: List[Message]) -> bool:
        count = 0
        for message in messages:
            if message.content == 'True':
                count += 1
            elif message.content == 'False':
                count -= 1
        return count >= self.threshold

    def run_bft(self, decision_content: str):
        # Phase 1: Broadcast proposal to all nodes
        proposal = Message(self.id, decision_content)
        for node_id in self.network:
            self.send_message(node_id, str(proposal))

        # Phase 2: Receive messages and generate votes
        messages = self.receive_messages()
        vote = self.generate_vote(messages)

        # Phase 3: Broadcast decision to all nodes
        decision = Message(self.id, str(vote))
        for node_id in self.network:
            self.send_message(node_id, str(decision))

        # Phase 4: Receive decisions and count votes
        decisions = [m.content for m in self.receive_messages()]
        count_true = decisions.count('True')
        count_false = decisions.count('False')

        # Record decision if it meets threshold, else record failure
        if count_true >= self.threshold:
            self.decisions[decision_content] = True
        elif count_false >= self.threshold:
            self.decisions[decision_content] = False
        else:
            self.decisions[decision_content] = None

示例代码

  • 只有故障节点数量小于阈值,该算法就能够容忍故障。
  • 由两个类组成:

    1. Message —— 示意网络中节点之间发送的音讯,蕴含发送者 ID 和音讯内容。
    2. ByzantineNode —— 示意网络节点,蕴含 ID、网络拓扑、容忍的故障数量阈值,以及存储节点决策的字典。

      • ByzantineNode 类提供了几个办法:
      • send_message()办法 —— 发送音讯到网络中的另一个节点
      • receive_messages()办法 —— 检索自上次调用 receive_messages() 以来收到的所有音讯
      • generate_vote()办法 —— 将音讯列表作为输出,并依据音讯内容生成投票。如果 ”True” 音讯的数量大于或等于阈值,则该办法返回 True,否则返回 False。
      • run_bft()办法 —— 实现 BFT 算法的 4 个阶段。

        • 阶段 1 —— 用 send_message()办法向网络中所有节点播送提案。提案是个 Message 对象,其内容是作为参数传递给 run_bft()办法的 decision_content。
        • 阶段 2 —— 用 receive_messages()办法接管来自网络中其余节点的音讯。用 generate_vote()办法依据收到的音讯生成投票,依据收到的 ”True” 和 ”False” 数量,投票 ”True” 或 ”False”。
        • 阶段 3 —— 用 send_message()办法将决策播送到网络中所有节点。决策是个 Message 对象,其内容为上一阶段生成的投票。
        • 阶段 4 —— 计算收到的票数,如果票数达到阈值,则记录决策。应用 receive_messages()办法检索自上次调用 receive_messages()以来收到的所有音讯。查看每条音讯内容,并计算 ”True” 和 ”False” 音讯的数量。如果 ”True” 的数量大于或等于阈值,则将该决策记录为 True。如果 ”False” 的数量大于或等于阈值,则该决策被记录为 False。如果两个条件都不满足,则该决策记录为 None。

长处

  • 在分布式系统中容忍肯定数量的谬误或失败
  • 即便存在故障节点或歹意攻打,也能确保分布式系统中所有节点达成统一的决策
  • 为用于加密货币和其余利用的区块链网络提供高水平的安全性和弹性

毛病

  • 可能须要低廉的计算,并且须要节点之间有高质量网络通信,否则可能会减少提早并升高零碎性能
  • 因为可能须要节点之间的高级协调和通信,因而可能不适宜所有类型的分布式系统
  • 不能为分布式系统中所有类型的故障或攻打提供残缺解决方案

实用场景

  • 金融零碎 —— 股票交易
  • 基础设施零碎 —— 电网或运输系统
  • 区块链网络 —— 加密货币和其余利用

挑战

  • 设计和实现 BFT 零碎可能很简单,并且须要在分布式系统、密码学和安全性方面具备高水平专业知识。
  • 确保所有节点都是可信、没有歹意的。
  • 在 BFT 零碎中实现高性能和低提早具备挑战性。

17. 分布式锁(Distributed Locking)
  • 治理分布式系统中共享资源的拜访。
  • 保证系统中多个节点不能同时拜访或批改雷同的资源,防止可能的不统一和数据损坏。
  • 波及以下步骤:

    1. 节点申请对共享资源加锁。申请蕴含资源的惟一标识符以及所申请的锁类型(例如,读或写)。
    2. 锁管理器治理锁,接管申请,并查看资源是否曾经锁定。如果资源未被锁定,锁管理器将锁授予申请节点并发送确认。
    3. 如果资源曾经被锁定,锁管理器查看申请节点是否被受权拜访该资源。如果该节点已取得受权,锁管理器将该申请增加到资源的挂起申请队列中,并向申请节点发送确认信息。如果该节点未被受权,则锁管理器回绝该申请并发送回绝音讯。
    4. 在期待授予锁时,申请节点定期轮询锁管理器以获取锁状态。
    5. 当节点拜访完资源后,通过向锁管理器发送开释申请来开释锁。锁管理器从资源中删除锁,并将锁授予队列中的下一个节点(如果有的话)。
    6. 如果持有锁的节点产生故障或解体,锁管理器将检测到该故障,并代表产生故障的节点开释锁,而后将锁授予队列中的下一个节点(如果有的话)。
    7. 如果节点申请锁,但没有收到锁管理器的响应,那么假设锁管理器曾经失败,并通过选举新的领导节点来接管锁管理器的角色。
from kazoo.client import KazooClient
from kazoo.exceptions import LockTimeout
import time

class DistributedLock:
    def __init__(self, zk_address, lock_path):
        self.zk = KazooClient(hosts=zk_address)
        self.lock_path = lock_path
        self.lock = None

    def __enter__(self):
        self.zk.start()
        self.lock = self.zk.Lock(self.lock_path)
        try:
            self.lock.acquire(timeout=10)
        except LockTimeout:
            self.zk.stop()
            raise Exception("Timeout while waiting for lock")

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.lock.release()
        self.zk.stop()

if __name__ == '__main__':
    zk_address = 'localhost:2181'
    lock_path = '/my_lock'
    with DistributedLock(zk_address, lock_path):
        print("Acquired lock!")
        time.sleep(10)
    print("Released lock!")

示例代码

  • 基于 Apache ZooKeeper 分布式协调服务
  • 导入所需的库 —— KazooClient 和 LockTimeout

    • KazooClient —— ZooKeeper 的 Python 客户端
    • LockTimeout —— 当不能在指定超时内取得锁时引发异样。
  • 定义 DistributedLock 类 —— 承受 2 个参数: zk_addresslock_path

    • zk_address —— ZooKeeper 服务器地址
    • lock_path —— 锁节点在 ZooKeeper 中的门路
    • init 办法中初始化 ZooKeeper 客户端,并存储lock_path,并且将锁变量初始化为 None。
    • enter 办法中启动 ZooKeeper 客户端,创立锁对象,并尝试获取锁。如果锁不能在 10 秒内取得,则引发 LockTimeout 异样。
    • exit 办法中开释锁并进行 ZooKeeper 客户端。
  • 在代码主体局部,用指定的 ZooKeeper 地址和锁门路创立 DistributedLock 类实例。
  • with 语句获取锁。
  • 当取得锁时,打印一条音讯,示意已取得锁,而后 sleep 10 秒,以模仿持有锁时正在实现的一些工作。
  • sleep 后,锁会被 with 语句主动开释,打印一条音讯,表明锁已被开释。

长处

  • 在分布式系统中,通过确保一次只有一个过程能够批改共享资源来保护数据一致性
  • 避免多个过程同时访问共享资源,以确保该资源在须要时始终可用
  • 容许多个过程跨多个节点访问共享资源

毛病

  • 须要在分布式系统中的多个节点之间进行协调
  • 在分布式系统中引入提早并升高性能
  • 如果分布式锁定机制失败,可能会导致整个分布式系统失败

实用场景

  • 分布式数据库
  • 电子商务系统 —— 治理对购物车的拜访或避免多个用户同时购买雷同的商品

挑战

  • 须要偏心分配资源,以确保所有过程都能平等访问共享资源
  • 不正确实现的分布式锁定可能导致死锁,多个过程期待彼此开释锁

18. 分片(Sharding)
  • 用于在多个服务器之间对数据进行程度分区,称为分片。
  • 每个分片蕴含数据的一个子集,所有分片组合起来就形成了残缺的数据集。
  • 用于进步分布式数据库的可伸缩性、性能和可用性。
  • 波及如下步骤:

    1. 依据分片键将数据划分为更小的子集。分片键的抉择使得数据能够均匀分布在各个分片上,并且能够将查问路由到正确的分片。
    2. 数据分区后,分片散布在多个服务器上。每个分片一个特定服务器,多个分片能够调配给同一个服务器。
    3. 当客户端向数据库发送查问时,该查问首先路由到协调器节点。协调器节点负责确定哪个分片蕴含执行查问所需的数据。
    4. 一旦协调节点确定了正确的分片,查问将被发送到蕴含该分片的服务器。服务器执行查问并将后果返回给协调器节点。
    5. 如果须要来自多个分片的数据实现查问,协调节点将每个分片的后果聚合并将最终后果返回给客户端。
import mysql.connector

# Connect to MySQL database
mydb = mysql.connector.connect(
  host="localhost",
  user="yourusername",
  password="yourpassword",
  database="mydatabase"
)

# Define sharding rules
shard_key = "user_id"
num_shards = 4

# Create sharded tables
for i in range(num_shards):
    cursor = mydb.cursor()
    cursor.execute(f"CREATE TABLE users_{i} (id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255), email VARCHAR(255))")

# Insert data into sharded tables
users = [{"id": 1, "name": "John", "email": "john@example.com"},
    {"id": 2, "name": "Jane", "email": "jane@example.com"},
    {"id": 3, "name": "Bob", "email": "bob@example.com"},
    # ...
]

for user in users:
    shard_id = user[shard_key] % num_shards
    cursor = mydb.cursor()
    cursor.execute(f"INSERT INTO users_{shard_id} (id, name, email) VALUES (%s, %s, %s)", (user["id"], user["name"], user["email"]))

# Query data from sharded tables
cursor = mydb.cursor()
cursor.execute("SELECT * FROM users_0 UNION SELECT * FROM users_1 UNION SELECT * FROM users_2 UNION SELECT * FROM users_3")
users = cursor.fetchall()

print(users)

示例代码

  • 应用 MySQL
  • 可能因用例和所应用的数据库系统而异。
  • 连贯 MySQL 数据库,定义分片规定。在本例中,咱们用 user_id 作为分片键,并创立 4 个分片表来存储数据。
  • 通过计算基于用户 ID 的分片 ID,并用该 ID 将数据插入到相应的分片表中,从而将数据插入到分片表。
  • 应用 UNION 语句查问所有分片表中的数据并打印后果。

长处

  • 容许数据库程度扩容,因而随着数据增长,能够向零碎增加额定的服务器来解决减少的负载。
  • 因为每个服务器只须要搜寻较小的数据集,因而能够更快执行查问。
  • 如果一台服务器呈现故障,只会影响局部数据,零碎的其余部分能够持续失常运行。

毛病

  • 须要确保对数据进行了正确的分区,并且分片均匀分布在各个服务器上。
  • 保护所有分片之间的数据一致性可能具备挑战
  • 实现分片须要额定的硬件、软件和保护老本

实用场景

  • 实用于高读写负载的大型数据库,能够横向扩大以解决减少的流量。当存在天文或法规限度,须要将数据存储在不同地位时,会十分有用。
  • 生成大量数据并须要疾速存储和检索的社交媒体平台
  • 解决大量交易,须要疾速存储和检索数据的电商平台
  • 须要疾速平安的存储和检索大量患者数据的医疗保健利用

参考文献

Byzantine Fault Tolerance (BFT) | River Glossary

What Is Byzantine Fault Tolerance?

Byzantine Fault Tolerance (BFT) Explained

Distributed Locks with Redis

How to do distributed locking

Distributed Locking

Sharding

What is Database Sharding?

What is Sharding?

Understanding Database Sharding


你好,我是俞凡,在 Motorola 做过研发,当初在 Mavenir 做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI 等技术始终保持着浓重的趣味,平时喜爱浏览、思考,置信继续学习、一生成长,欢送一起交流学习。微信公众号:DeepNoMind

本文由 mdnice 多平台公布

正文完
 0