关于程序员:分布式事务的21种武器-2

32次阅读

共计 11453 个字符,预计需要花费 29 分钟才能阅读完成。

在分布式系统中,事务的解决散布在不同组件、服务中,因而分布式事务的 ACID 保障面临着一些非凡难点。本系列文章介绍了 21 种分布式事务设计模式,并剖析其实现原理和优缺点,在面对具体分布式事务问题时,能够抉择适合的模式进行解决。原文: Exploring Solutions for Distributed Transactions (2)”)

在不同业务场景下,能够有不同的解决方案,常见办法有:

  1. 阻塞重试(Blocking Retry)
  2. 二阶段和三阶段提交(Two-Phase Commit (2PC) and Three-Phase Commit (3PC))
  3. 基于后盾队列的异步解决(Using Queues to Process Asynchronously in the Background)
  4. TCC 弥补(TCC Compensation Matters)
  5. 本地音讯表(异步保障)/ 发件箱模式(Local Message Table (Asynchronously Ensured)/Outbox Pattern)
  6. MQ 事务(MQ Transaction)
  7. Saga 模式(Saga Pattern)
  8. 事件驱动(Event Sourcing)
  9. 命令查问职责拆散(Command Query Responsibility Segregation, CQRS)
  10. 原子提交(Atomic Commitment)
  11. 并行提交(Parallel Commits)
  12. 事务复制(Transactional Replication)
  13. 一致性算法(Consensus Algorithms)
  14. 工夫戳排序(Timestamp Ordering)
  15. 乐观并发管制(Optimistic Concurrency Control)
  16. 拜占庭容错(Byzantine Fault Tolerance, BFT)
  17. 分布式锁(Distributed Locking)
  18. 分片(Sharding)
  19. 多版本并发管制(Multi-Version Concurrency Control, MVCC)
  20. 分布式快照(Distributed Snapshots)
  21. 主从复制(Leader-Follower Replication)

本文将介绍 TCC 弥补、本地音讯表 / 发件箱以及 MQ 事务三种模式。

4. TCC 弥补(Try-Confirm-Cancel Compensation Matters)

  • 用于解决跨多个服务的长时间事务。
  • 将一个简单事务合成为多个步骤,每个步骤都是一个独自的服务调用或数据库操作。事务中的每个步骤都有 3 个阶段: 尝试、确认和勾销。
  • 尝试 (Try) 阶段 —— 服务尝试执行操作,执行一系列查看,以确保执行操作是平安的。如果所有查看都通过,则执行该操作,并将状态保留在长期存储中。
  • 确认 (Confirm) 阶段 —— 服务确认操作胜利,验证事务状态,并确保满足所有依赖关系。如果所有内容都无效,则将长期状态提交给主数据库。
  • 勾销 (Cancel) 阶段 —— 如果在尝试阶段或确认阶段呈现谬误,将进入勾销阶段。服务将撤销在尝试阶段所做的更改,将状态复原到事务开始之前。
import requests

class OrderService:
    def __init__(self):
        self.session = requests.Session()

    def create_order(self, order_data):
        try:
            # Step 1: Reserve stock
            res = self.session.post('http://inventory-service/reserve-stock', json=order_data)
            res.raise_for_status()

            # Step 2: Charge payment
            res = self.session.post('http://payment-service/charge-payment', json=order_data)
            res.raise_for_status()

            # Step 3: Confirm order
            res = self.session.post('http://order-service/confirm-order', json=order_data)
            res.raise_for_status()
        except requests.exceptions.RequestException as e:
            # Step 4: Cancel order
            self.session.post('http://order-service/cancel-order', json=order_data)
            self.session.post('http://payment-service/refund-payment', json=order_data)
            self.session.post('http://inventory-service/release-stock', json=order_data)
            raise e

示例代码

OrderService类遵循尝试 - 确认 - 勾销 (TCC) 模式创立订单。

订单创立过程包含 3 个步骤:

  1. 保留库存
  2. 付款
  3. 确认订单

任何一个步骤失败,都须要勾销订单并吊销所做的更改。这些都在异样处理程序中实现,该处理程序回滚 try 代码块执行期间所做的任何数据库更改。

reserve_stock() 办法中,查看是否有足够库存,如果有,则通过缩小数据库中的可用库存计数来保留库存,如果没有,则引发异样并回滚到目前为止所做的任何更改。

charge_payment() 办法中,查看客户是否有足够资金领取订单。如果有,将从余额中扣除订单金额,如果没有,则引发异样并回滚到目前为止所做的任何更改。

confirm_order() 办法中,将数据库中的订单状态更新为 ”confirmed”。如果在此步骤中出现异常,则勾销订单并回滚到目前为止所做的任何更改。

长处

  • 防止资源被长时间锁定
  • 解决失败或不残缺的交易时提供弥补机制
  • 在 2PC 不适宜的零碎中解决分布式事务

毛病

  • 须要额定工作为每个事务实现弥补机制
  • 没有原子性保障

实用场景

  • 解决零碎中的长时间事务

挑战

  • 实现弥补事务可能很简单
  • 在分布式环境中协调多个服务可能具备挑战性

5. 本地音讯表(异步保障)/ 发件箱模式(Local Message Table (Asynchronously Ensured)/Outbox Pattern)
  • 微服务架构中的消息传递模式。
  • 容许微服务应用本地音讯表作为两头缓冲区异步替换音讯。
  • 每个须要与其余微服务替换音讯的微服务都有本人的音讯表。
  • 当一个微服务须要向另一个微服务发送音讯时,会将音讯增加到本人的音讯表中,音讯中蕴含接管微服务解决申请所需的所有必要信息。
  • 一旦音讯被增加到表中,发送微服务就能够持续解决其余工作。同时,一个后盾工作线程或一个独自的过程继续监控音讯表中的新音讯。
  • 当检测到新音讯时,工作线程 / 过程检索该音讯并将其发送到接收端微服务。
  • 一旦接收端微服务解决了音讯,就会向发送端微服务发送确认,表明音讯已胜利解决。确认还能够蕴含发送端微服务须要的任何后果或数据。
  • 如果在肯定工夫内没有收到确认,发送端微服务能够假如音讯没有被胜利解决,并能够采取适当措施,例如从新发送音讯或勾销申请。
import psycopg2
from psycopg2 import sql
from psycopg2.extras import DictCursor
import json
import threading
import time

# Define the message schema
message_schema = {"id": "","data": {},"status":""
}

# Connect to the PostgreSQL database
conn = psycopg2.connect(
    dbname="my_database",
    user="my_username",
    password="my_password",
    host="localhost",
    port="5432"
)

# Create the messages table if it doesn't exist
with conn.cursor() as cur:
    cur.execute("""
        CREATE TABLE IF NOT EXISTS messages (
            id UUID PRIMARY KEY,
            data JSONB NOT NULL,
            status TEXT NOT NULL
        )
    """)
    conn.commit()

# Define the message producer
def send_message(data):
    with conn.cursor() as cur:
        # Create the message using the schema
        message = message_schema.copy()
        message["id"] = str(uuid.uuid4())
        message["data"] = data
        message["status"] = "new"
        
        # Insert the message into the messages table
        cur.execute("""
            INSERT INTO messages (id, data, status)
            VALUES (%s, %s, %s)
        """, (message["id"], json.dumps(message["data"]), message["status"]))
        conn.commit()

# Define the message consumer
def message_consumer():
    while True:
        with conn.cursor(cursor_factory=DictCursor) as cur:
            # Select the oldest message from the messages table that has a status of "new"
            cur.execute("""
                SELECT *
                FROM messages
                WHERE status = 'new'
                ORDER BY id
                LIMIT 1
                FOR UPDATE SKIP LOCKED
            """)
            row = cur.fetchone()
            if row:
                message = dict(row)
                # Update the message status to "processing"
                cur.execute("""
                    UPDATE messages
                    SET status = 'processing'
                    WHERE id = %s
                """, (message["id"],))
                conn.commit()
                # Process the message
                process_message(message)
                # Update the message status to "processed"
                cur.execute("""
                    UPDATE messages
                    SET status = 'processed'
                    WHERE id = %s
                """, (message["id"],))
                conn.commit()
        time.sleep(1)

# Start the message consumer in a separate thread
consumer_thread = threading.Thread(target=message_consumer)
consumer_thread.start()

示例代码

  • 应用 PostgreSQL 作为数据库,应用 psycopg2 库与数据库交互。
  • send_message函数创立一条新音讯并将其插入 PostgreSQL 数据库。
  • message_consumer函数轮询数据库,查找状态为 ”new” 的新音讯,解决后将状态更新为 ” 已解决(processed)”。
  • 本地音讯表 (Local Message Table) 模式的理论实现可能会依据特定需要和所抉择的数据库解决方案而有所不同。

长处

  • 音讯不会失落,也不会被屡次解决。
  • 当在后盾替换音讯时,发送端微服务持续解决其余工作。
  • 微服务彼此解耦,能够独立开发和部署。
  • 容许在不影响现有零碎的状况下增加新的微服务。

毛病

  • 思考到音讯格局、音讯大小和错误处理等因素,可能会比较复杂。
  • 可能不适宜须要实时处理的场景,因为发送端和接收端微服务之间可能会有提早。
  • 须要额定的基础设施,如音讯队列或数据库来存储和治理音讯。

实用场景

  • 须要解决不同畛域业务的领有多个微服务的电子商务网站,如库存治理、订单解决和领取解决。
  • 可用于在不同服务之间共享电子医疗记录和医疗账单的医疗保健零碎。
  • 须要解决不同畛域业务的领有多个微服务的银行零碎,如账户治理、交易解决和欺诈检测。

挑战

  • 音讯格局 —— 音讯格局应该与将要替换音讯的所有服务兼容,可能须要跨服务的标准化。
  • 音讯大小 —— 音讯应该尽量小,以缩小解决工夫和在传输过程中产生谬误的可能性。
  • 错误处理 —— 解决音讯传输或业务过程中产生的谬误。例如,从新尝试失败的音讯或采取其余纠正措施。
  • 实时处理 —— 这种模式可能不适宜须要实时处理的场景,因为发送端和接收端音讯之间可能会有一些提早。

发件箱模式

  • 一种面向音讯的模式,有助于实现服务之间的事务一致性
  • 确保音讯只交付一次,并且即便产生失败或谬误,也保持一致的状态
  • 以下是一些步骤:

    1. 服务接管来自客户端或其余服务的申请
    2. 当收到申请时,服务将须要发送给其余服务的数据写入本人数据库中的发件箱表,发件箱表蕴含接管服务解决申请所需的所有必要信息
    3. 写到发件箱表后,服务提交数据库事务,以确保数据安全存储在数据库中
    4. 后盾运行的发件箱解决过程定期运行,读取数据库中的发件箱表,以查看须要发送的新音讯
    5. 如果发件箱解决过程在发件箱表中检测到新音讯,将从数据库中检索该音讯并将其发送给适当的音讯代理
    6. 一旦音讯被音讯代理接管,就被发送给接收端,接收端能够是其余服务或上游零碎
    7. 音讯胜利交付给接收端后,音讯代理向发件箱解决过程发送确认,以标记音讯已胜利发送
    8. 一旦发件箱解决过程接管到来自音讯代理的确认,将从发件箱表中删除该音讯以防止反复发送
import json
import pika
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, DateTime

# Create SQLAlchemy engine and metadata
engine = create_engine('postgresql://user:password@localhost:5432/db')
metadata = MetaData()

# Define the Outbox table schema
outbox_table = Table('outbox', metadata,
                     Column('id', Integer, primary_key=True),
                     Column('event_type', String),
                     Column('payload', String),
                     Column('created_at', DateTime)
                    )

# Define RabbitMQ connection parameters
rabbitmq_params = pika.ConnectionParameters(host='localhost', port=5672)

# Define RabbitMQ queue name
queue_name = 'outbox'

# Connect to RabbitMQ
connection = pika.BlockingConnection(rabbitmq_params)
channel = connection.channel()

# Declare the queue
channel.queue_declare(queue=queue_name, durable=True)

# Define function to publish messages to RabbitMQ
def publish_message(message):
    channel.basic_publish(exchange='',
                          routing_key=queue_name,
                          body=message,
                          properties=pika.BasicProperties(delivery_mode=2,  # Make messages persistent))

# Define function to read messages from the Outbox table and publish them to RabbitMQ
def process_outbox():
    with engine.connect() as conn:
        result = conn.execute(outbox_table.select().order_by(outbox_table.c.id))
        for row in result:
            # Create message payload as a dictionary
            payload = {'event_type': row['event_type'],
                'data': json.loads(row['payload'])
            }
            # Convert payload to JSON string
            message = json.dumps(payload)
            # Publish message to RabbitMQ
            publish_message(message)
            # Delete message from Outbox table
            conn.execute(outbox_table.delete().where(outbox_table.c.id == row['id']))

# Define function to write data to the Outbox table
def write_to_outbox(event_type, data):
    with engine.connect() as conn:
        # Convert data to JSON string
        payload = json.dumps(data)
        # Insert data into Outbox table
        conn.execute(outbox_table.insert().values(
            event_type=event_type,
            payload=payload,
            created_at=datetime.now()))

# Example usage:
write_to_outbox('user_created', {'user_id': 123, 'name': 'John Doe'})
process_outbox()

示例代码

  • 通过 SQLAlchemy 和 RabbitMQ 实现
  • 通过 SQLAlchemy 连贯到 PostgreSQL 数据库,并定义发件箱表构造
  • 通过 Pika 连贯到 RabbitMQ 音讯代理并定义队列名
  • write_to_outbox函数将数据写入发件箱表
  • process_outbox函数从发件箱表中读取音讯并公布到 RabbitMQ
  • publish_message函数向 RabbitMQ 公布音讯

长处

  • 即便数据库事务失败,所有事件也会公布到音讯代理
  • 将事件公布与主利用程序代码解耦
  • 容许高效的批处理事件

毛病

  • 须要额定的开发工作来实现发件箱表并将事件公布到音讯代理的后盾工作过程 / 线程
  • 为零碎架构引入了额定的复杂性
  • 须要认真治理数据库事务,以确保只有在事务胜利时才公布音讯

实用场景

  • 当微服务须要向多个订阅者公布事件时,例如在公布 / 订阅消息传递场景中,并确保在音讯流中捕捉所有数据库更新。
  • 须要向多个订阅者公布对于新订单、更新订单状态和客户反馈事件 (包含订单履行零碎、客户服务和营销) 的电子商务系统
  • 须要捕捉音讯流中的所有金融交易,以用于审计和听从性目标的金融零碎

挑战

  • 确保负责公布事件的后盾工作过程 / 线程具备高可用性和容错能力
  • 治理数据库事务,确保仅在关联的数据库事务胜利时才公布音讯
  • 在向音讯代理公布事件时处理错误和重试

注意事项

  • 音讯格局 —— 应用 JSON 或 Protobuf 音讯格局来定义要公布的事件。格局将取决于特定应用程序的需要,但必须与正在应用的音讯代理兼容。
  • 音讯大小 —— 取决于所公布事件的大小和复杂性。确保音讯大小不超过所应用的音讯代理的限度。大型音讯可能会导致性能问题,为了无效解决,可能须要将其拆分为较小的音讯。
  • 错误处理 —— 如果在事件公布过程中产生谬误,须要处理错误并重试。应应用后退策略执行重试,以防止音讯代理过载或引起拒绝服务攻打。如果在将事件插入发件箱表期间产生谬误,则不应将事件公布到音讯代理,以防止数据不统一。

类似点

  • 两种模式都应用音讯表来存储须要在服务之间替换的音讯
  • 两种模式都容许服务在后盾替换音讯时持续解决其余工作
  • 两种模式都提供了在微服务之间替换音讯的机制

不同点

  • 数据库 —— 发件箱模式应用数据库来存储所有须要替换的音讯,而本地音讯表模式为每个须要替换音讯的服务应用一个音讯表。
  • 实现 —— 发件箱模式通过后盾工作线程 / 过程从发件箱表中读取音讯,并发送到音讯代理(如 RabbitMQ 或 Kafka)。音讯表模式通过后盾工作线程 / 过程监督音讯表中的新音讯,并应用任何消息传递机制(如间接 HTTP 申请或音讯代理) 发送到接收端服务。
  • 用例 —— 发件箱模式在微服务须要向多个订阅者公布事件的场景中特地有用。当多个服务须要彼此替换音讯,但并非所有服务都须要接管每条音讯时,本地音讯表模式十分有用。
  • 挑战 —— 发件箱模式须要认真解决数据库事务,以确保只有在关联的数据库事务胜利时才公布音讯。本地音讯表模式可能须要更简单的错误处理,因为如果音讯未能达到目的地,可能须要从新尝试发送。

6. MQ 事务(MQ Transaction)

  • 一系列音讯操作被分组在一起作为单个工作单元。
  • 事务确保所有操作都胜利实现,或者所有操作都失败,从而确保即便在产生故障或谬误时,零碎也保持一致的状态。
  • 以确保原子性、一致性、隔离性和持久性 (ACID) 的形式解决音讯。
  • 步骤:

    1. 通过指定连贯设置 (如代理 URL、用户名和明码) 来建设到音讯代理的连贯。
    2. 一旦建设连贯,通过调用 begin 办法启动事务。此办法告诉音讯代理事务曾经启动,并且所有后续音讯操作应该作为事务组合在一起。
    3. 通过将音讯发送到队列或主题,执行须要作为事务组合在一起的音讯操作。
    4. 一旦胜利执行了所有音讯操作,通过调用 commit 办法提交事务。此办法告诉音讯代理,作为事务执行的所有音讯操作都已胜利实现。
    5. 如果事务期间产生任何谬误,则通过调用 rollback 办法回滚事务。此办法告诉音讯代理事务曾经失败,作为事务执行的所有音讯操作都应该吊销。
import stomp

class MyListener(stomp.ConnectionListener):

    def __init__(self, conn):
        self.conn = conn

    def on_message(self, headers, message):
        # Process the message
        print("Received message:", message)
        # Acknowledge the message
        self.conn.ack(id=headers['message-id'], subscription=headers['subscription'])

    def on_error(self, headers, message):
        print('Received an error"{}": {}'.format(headers, message))
    
    def on_disconnected(self):
        print('Disconnected from the message broker.')

# Define the message broker connection settings
broker_url = "tcp://localhost:61613"
username = "myusername"
password = "mypassword"
queue_name = "/queue/myqueue"

# Establish a connection to the message broker
conn = stomp.Connection([(broker_url, 61613)])
conn.set_listener('', MyListener(conn))
conn.start()
conn.connect(username, password)

try:
    # Begin the transaction
    conn.begin()
    # Send the first message
    conn.send(body="Message 1", destination=queue_name)
    # Send the second message
    conn.send(body="Message 2", destination=queue_name)
    # Commit the transaction
    conn.commit()
except Exception as ex:
    # Rollback the transaction in case of errors
    conn.rollback()
    print("Error sending messages:", str(ex))

# Disconnect from the message broker
conn.disconnect()

示例代码

  • 基于 Apache ActiveMQ 库实现
  • beginsendcommit 办法用于将两个音讯的发送组合为单个事务。
  • 如果在事务期间产生谬误,则调用 rollback 办法以确保事务中的音讯都没有被解决。
  • 并不是所有音讯代理都反对事务,理论实现可能因特定音讯代理和库而有所不同。

长处

  • 即便在网络故障或产生其余中断的状况下,也要确保将消息传递到预期的收件人
  • 一组音讯操作被视为独自的事务,要么处理事务中的所有音讯,要么不解决任何音讯
  • 能够解决大量音讯,并且能够依据须要横向扩大
  • 能够跨不同平台和编程语言应用,使其成为音讯替换的通用解决方案

毛病

  • 实现 MQ 事务可能很简单
  • 减少了音讯解决的开销,会影响零碎性能
  • 须要受权费用和硬件老本,实现可能比拟低廉

实用场景

  • 须要解决大量音讯并且须要保障原子性时
  • 解决与金融交易相干的音讯,如股票交易或在线领取
  • 解决与供应链治理相干的音讯,例如跟踪库存程度或协调交付
  • 解决与在线订购零碎相干的音讯,例如解决订单和跟踪发货

注意事项

  • 音讯格局 —— 取决于正在应用的具体实现。通常,音讯与其余元数据(如音讯头和属性) 一起被格式化为数据载荷,无效负载能够是各种格局,比方 XML、JSON 或二进制数据。
  • 音讯大小 —— 最大音讯大小取决于正在应用的具体实现。较大的音讯可能须要分成较小的块进行解决。
  • 错误处理 —— 产生谬误时,确保音讯解决不中断,谬误失去适当解决。因而,须要实现适当的重试机制和错误报告。例如,MQ 事务零碎向发送方返回谬误音讯,发送方能够从新发送该音讯或告诉管理员。须要实现一些错误处理策略,比方设置专用的错误处理流程、实现重试机制,或者应用日志记录和监控工具跟踪谬误和性能指标。

参考文献

Message Queue Transactions

Handling Transactions in IBM MQ

Message Queuing and the database: Solving the dual write problem

Message Queues: An Introduction

MSMQ Basics: Queues, Messages, Transactions

Reliable Microservices Data Exchange With the Outbox Pattern

Microservices Pattern: Transactional outbox

The Outbox Pattern

Outbox Pattern for Microservices Architecture

Microservices 101: Transactional Outbox and Inbox

Design Pattern for Distributed Transactions

Making Try-Confirm/Cancel Easy with MicroTx

Build software better, together

An In-Depth Analysis of Distributed Transaction Solutions


你好,我是俞凡,在 Motorola 做过研发,当初在 Mavenir 做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI 等技术始终保持着浓重的趣味,平时喜爱浏览、思考,置信继续学习、一生成长,欢送一起交流学习。\
微信公众号:DeepNoMind

本文由 mdnice 多平台公布

正文完
 0