共计 9491 个字符,预计需要花费 24 分钟才能阅读完成。
在分布式系统中,事务的解决散布在不同组件、服务中,因而分布式事务的 ACID 保障面临着一些非凡难点。本系列文章介绍了 21 种分布式事务设计模式,并剖析其实现原理和优缺点,在面对具体分布式事务问题时,能够抉择适合的模式进行解决。原文: Exploring Solutions for Distributed Transactions (4)
在不同业务场景下,能够有不同的解决方案,常见办法有:
- 阻塞重试(Blocking Retry)
- 二阶段和三阶段提交(Two-Phase Commit (2PC) and Three-Phase Commit (3PC))
- 基于后盾队列的异步解决(Using Queues to Process Asynchronously in the Background)
- TCC 弥补(TCC Compensation Matters)
- 本地音讯表(异步保障)/ 发件箱模式(Local Message Table (Asynchronously Ensured)/Outbox Pattern)
- MQ 事务(MQ Transaction)
- Saga 模式(Saga Pattern)
- 事件驱动(Event Sourcing)
- 命令查问职责拆散(Command Query Responsibility Segregation, CQRS)
- 原子提交(Atomic Commitment)
- 并行提交(Parallel Commits)
- 事务复制(Transactional Replication)
- 一致性算法(Consensus Algorithms)
- 工夫戳排序(Timestamp Ordering)
- 乐观并发管制(Optimistic Concurrency Control)
- 拜占庭容错(Byzantine Fault Tolerance, BFT)
- 分布式锁(Distributed Locking)
- 分片(Sharding)
- 多版本并发管制(Multi-Version Concurrency Control, MVCC)
- 分布式快照(Distributed Snapshots)
- 主从复制(Leader-Follower Replication)
本文将介绍原子提交、并行提交以及事务复制三种模式。
10. 原子提交(Atomic Commitment)
- 原子提交 是一种事务性流程,确保事务中的所有操作都胜利实现,如果其中任何一个操作失败,则所有操作都失败。
- 单个事务中可能波及多个流程。
-
波及如下步骤:
- 开始事务(Begin Transaction),事务开始于客户端或用户执行一系列操作的申请,这些操作被视为单个事务。事务协调器治理整个事务,负责将事务标记为 ” 正在进行中 ”。
- 筹备阶段(Prepare Phase),事务协调器向事务中波及的所有参与者发送音讯,确保他们可能执行所申请的操作。参与者回复音讯,示意是否能够执行操作。如果任何参与者不能执行操作,事务将被终止。
- 提交阶段(Commit Phase),如果事务中的所有参与者都能够执行操作,事务协调器将向所有参与者发送提交音讯,参与者依照批示执行操作,并回复 ” 确认 ” 音讯,表明曾经胜利实现了操作。
- 确认阶段(Finalize Phase),事务协调器期待所有参与者回复 ” 确认 ” 音讯。如果所有参与者都响应 ” 确认 ” 音讯,事务协调器将事务标记为 ” 已提交 ”。如果任何参与者响应失败,事务将被终止。
- 停止阶段(Abort Phase),如果事务在任何阶段被停止,事务协调器将向所有参与者发送停止音讯,要求吊销可能曾经执行的任何操作。参与者用 ” 确认 ” 音讯进行回复,表明曾经胜利吊销了操作。
import pymysql
# Connect to the database
connection = pymysql.connect(
host='localhost',
user='user',
password='password',
db='mydatabase',
autocommit=False
)
try:
# Start the transaction
with connection.cursor() as cursor:
cursor.execute("START TRANSACTION")
# Perform the SQL statements
with connection.cursor() as cursor:
cursor.execute("INSERT INTO products (name, price) VALUES ('Product A', 10)")
cursor.execute("UPDATE customers SET balance = balance - 10 WHERE id = 1")
# Commit the transaction
connection.commit()
except:
# Rollback the transaction if there is an error
connection.rollback()
finally:
# Close the database connection
connection.close()
示例代码
- 导入 PyMySQL 库,用于连贯 MySQL 数据库。
- 基于指定的凭据连贯到数据库,并将 ”auto-commit” 参数设置为 ”False”,这意味着将应用事务。
try
代码块将在事务中执行 SQL 语句。with
语句创立 cursor 对象,用于执行 SQL 语句。cursor.execute()
办法通过执行 SQL 命令start transaction
来启动事务。cursor.execute()
办法执行 2 条 SQL 语句: 一条INSERT
语句用于增加新产品,一条UPDATE
语句用于从客户余额中扣除价格。- 如果没有谬误,则调用
connection.commit()
办法提交事务,也就是说,将更改永恒保留到数据库中。 - 如果呈现谬误,则执行
except
代码块,该块通过connection.rollback()
办法回滚事务。在事务中所做的更改将被吊销,数据库将复原到初始状态。 connection.close()
办法敞开数据库连贯。
长处
- 一致性 —— 在事务执行后,数据库保持一致状态。要么保留所有更改,要么不保留任何更改,这有助于保护数据的完整性。
- 数据完整性 —— 防止出现不残缺数据或者执行了局部事务。
- 回滚 —— 如果事务中呈现谬误,则回滚整个事务,也就是说,吊销所做的所有更改,并将数据库复原到初始状态。
毛病
- 性能开销 —— 须要额定资源来确保事务主动执行
- 减少复杂性 —— 须要额定代码来确保事务主动执行,会减少应用程序的复杂性
实用场景
- 用于电子商务应用程序,以确保仅在付款胜利时才下订单
- 用于金融应用程序,以确保只有在满足所有必须条件时才实现事务
挑战
- 死锁 —— 当多个事务试图同时获取雷同的资源时
- 分布式事务 —— 当波及多个数据库时,设计变得更加简单
- 性能开销 —— 会导致性能开销
11. 并行提交(Parallel Commits)
- 容许多个事务并发提交更改
-
波及如下步骤:
- 启动事务 —— 多个事务由不同的用户或过程发动。每个事务都有本人要执行的一组 SQL 语句。
- WAL(Write-Ahead Logging) —— 数据库系统保护 WAL 来记录对数据库的所有更改。在进行任何更改之前,事务将更改的记录写入 WAL,确保在产生故障时能够将数据库复原到以前的状态。
- 执行 SQL 语句 —— 每个事务执行本人的一组 SQL 语句,这些语句能够包含更新、插入和删除。
- 锁 —— 当一个事务批改某个数据库记录时,就取得了对该记录的锁,确保没有其余事务能够同时批改同一记录。
- 提交阶段 —— 一旦事务执行了所有 SQL 语句并开释了锁,就进入提交阶段。在此阶段,事务向 WAL 写入提交记录,表明已筹备好提交。
- 并行提交 —— 一旦所有事务进入提交阶段,数据库系统就能够执行并行提交,这意味着零碎能够并发将多个事务所做的更改写入数据库。
- 写入数据库 —— 数据库系统将每个事务所做的更改写入数据库。因为每个事务曾经将其更改写入了 WAL,因而零碎能够疾速的将更改写入数据库。
- 完结事务 —— 一旦更改被写入数据库,事务被认为曾经实现,零碎开释事务持有的所有锁以及应用的资源。
import psycopg2
from psycopg2.extras import RealDictCursor
# Establish connection to the database
conn = psycopg2.connect(
host="localhost",
database="ecommerce",
user="postgres",
password="password"
)
# Initialize two transactions for User A and User B
with conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute("BEGIN;")
cursor.execute("INSERT INTO cart (user_id, item_id) VALUES (1, 123);")
cursor.execute("UPDATE account SET balance = balance - 10 WHERE user_id = 1;")
cursor.execute("INSERT INTO orders (user_id, item_id, price) VALUES (1, 123, 10);")
cursor.execute("COMMIT;")
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute("BEGIN;")
cursor.execute("INSERT INTO cart (user_id, item_id) VALUES (2, 456);")
cursor.execute("UPDATE account SET balance = balance - 20 WHERE user_id = 2;")
cursor.execute("INSERT INTO orders (user_id, item_id, price) VALUES (2, 456, 20);")
cursor.execute("COMMIT;")
# Close the database connection
conn.close()
示例代码
- 用
psycopg2
库连贯到 PostgreSQL 数据库,并执行 SQL 语句来更新数据库和创立新订单。 - 为用户 A 和用户 B 初始化 2 个事务,每个事务执行一组 SQL 语句来更新数据库并创立一个新订单。
BEGIN
语句标记事务的开始。COMMIT
语句标记事务的完结。INSERT
和UPDATE
语句批改数据库记录以反映用户所做的更改。- 一旦每个事务执行了 SQL 语句并开释了锁,就进入提交阶段,这就是并行提交过程产生的中央。
- 事务向 WAL 写入提交记录,表明曾经筹备好提交,数据库系统能够执行并行提交,将两个事务所做的更改并发写入数据库。
- 一旦将更改写入数据库,就认为每个事务都实现了,零碎开释事务持有的所有锁和资源。
长处
- 并行提交容许并发同时执行多个事务
- 通过向零碎中增加更多服务器或节点来横向扩大数据库系统规模
毛病
- 减少了数据库系统的复杂性
- 因为多个事务能够并发批改雷同的数据,因而减少了抵触和数据不统一的危险
- 须要比串行提交更多的资源
实用场景
- 容许多个用户在电子商务平台中同时结帐
- 容许多个用户在财务零碎中同时拜访帐户余额
- 使多个医疗保健业余人员可能在医疗保健零碎中并发拜访患者记录
- 容许多个用户同时更新运输和交付信息
挑战
- 治理锁以确保数据一致性和完整性
- 跨多个节点或服务器同步事务执行可能是一个挑战
12. 事务复制(Transactional Replication)
- 数据库复制流程,波及实时将事务从一个数据库复制和散发到另一个数据库。
- 通常在须要将数据从主数据库复制到一个或多个从数据库以进行备份、报告或其余目标时应用。
-
波及如下步骤:
-
配置 —— 设置事务性复制以配置主数据库和从数据库,包含设置适合的公布、订阅和散发代理,包含抉择要复制的表、列和数据类型,以及设置安全性和其余配置选项。
- 布局复制拓扑 —— 决定哪些数据库将充当发布者,哪些数据库将充当订阅者,以及将应用的复制类型(例如,单向或双向)。
- 配置主数据库 —— 设置适合的公布项,定义将要复制的表、列和数据类型,以及任何其余配置选项,如安全性和过滤。
- 配置从数据库 —— 设置适合的订阅项,定义复制数据的指标数据库,以及任何其余配置选项,例如安全性和错误处理。
- 设置散发代理 —— 将复制的数据从主数据库散发到从数据库,并且能够配置为间断运行或定时运行。
- 监控及排障 —— 一旦设置了复制,监控其性能并排除呈现的任何问题,例如失败的事务或连贯问题。
- 快照 —— 配置完数据库后,对主数据库中的数据进行快照。将主数据库中所选表中的所有数据复制到快照文件中,用该文件初始化从数据库。
- 散发 —— 快照实现后,散发过程开始。将产生在主数据库中的事务复制到散发数据库。散发数据库充当所有事务的地方存储库,并充当将事务散发到从数据库的源。
- 公布 —— 一旦事务被写入散发数据库,将被公布到对应的订阅。一个公布是一组蕴含一个或多个订阅的我的项目(即表、视图或存储过程),每个订阅都与某个特定的从数据库相关联。
- 订阅 —— 订阅公布并将复制的事务利用到从数据库。订阅过程包含设置订阅代理,该代理将事务从散发数据库复制到从数据库,并实时的将事务利用到从数据库。
-
import pyodbc
class Replication:
def __init__(self, publisher_conn_str, subscriber_conn_str):
self.publisher_conn = pyodbc.connect(publisher_conn_str)
self.subscriber_conn = pyodbc.connect(subscriber_conn_str)
def add_publication(self, pub_name, table_name):
with self.publisher_conn.cursor() as cursor:
# Create publication
cursor.execute("EXEC sp_addpublication @publication = ?, @description ='Transaction Replication', @sync_method ='native', @repl_freq ='continuous'", pub_name)
# Add article to publication
cursor.execute("EXEC sp_addarticle @publication = ?, @article = ?, @source_owner ='dbo', @source_object = ?, @type ='logbased', @destination_table = ?, @pre_creation_cmd ='truncate'", pub_name, table_name, table_name, table_name)
# Enable publication for subscription
cursor.execute("EXEC sp_addsubscription @publication = ?, @subscriber = ?, @destination_db = ?, @sync_type ='initialize with backup', @backupdevicetype ='disk', @backupdevicename ='C:\\backup.bak', @update_mode ='read only'", pub_name, self.subscriber_conn.database, self.subscriber_conn.database)
# Create snapshot agent
cursor.execute("EXEC sp_addpublication_snapshot @publication = ?, @frequency_type = 4, @frequency_interval = 1, @frequency_relative_interval = 1, @frequency_recurrence_factor = 0, @frequency_subday = 8, @frequency_subday_interval = 1, @active_start_time_of_day = 0, @active_end_time_of_day = 235959, @active_start_date = 0, @active_end_date = 0, @job_login = null, @job_password = null, @publisher_security_mode = 1", pub_name)
def add_subscription(self, sub_name, pub_name):
with self.subscriber_conn.cursor() as cursor:
# Add subscription to publication
cursor.execute("EXEC sp_addsubscription @publication = ?, @subscriber = ?, @destination_db = ?, @sync_type ='initialize with backup', @backupdevicetype ='disk', @backupdevicename ='C:\\backup.bak', @update_mode ='read only'", pub_name, sub_name, self.subscriber_conn.database)
# Create subscription agent
cursor.execute("EXEC sp_addsubscription_agent @publication = ?, @subscriber = ?, @subscriber_db = ?, @job_login = null, @job_password = null, @subscriber_security_mode = 1", pub_name, sub_name, self.subscriber_conn.database)
def start(self):
with self.publisher_conn.cursor() as cursor:
# Start snapshot agent
cursor.execute("EXEC sp_startpublication_snapshot @publication = ?", pub_name)
# Start distribution agent
cursor.execute("EXEC sp_startpublication_agent @publication = ?, @publisher_security_mode = 1, @publisher_login = null, @publisher_password = null", pub_name)
示例代码
- 基于
pyodbc
库连贯到两个 ODBC 数据库 replication
类创立复制对象,用于治理复制过程-
add_publication()
办法在发布者数据库上创立公布,此公布蕴含在订阅者数据库中复制的数据,承受 2 个参数pub_name
和table_name
,别离示意公布的名称和要复制的表的名称,而后用pyodbc cursor
执行 4 条 SQL 命令。- 第 1 条命令: 用
sp_addpublication
存储过程创立公布,该存储过程有几个参数,包含公布名称、形容、同步办法和复制频率。 - 第 2 条命令: 用
sp_addarticle
存储过程向公布增加一条记录,该存储过程指定要复制的表、公布名称和其余参数。 - 第 3 条命令: 用
sp_addsubscription
存储过程启用订阅的公布,该存储过程指定公布名称、订阅者数据库和其余参数。 - 第 4 条命令: 用
sp_addpublication_snapshot
存储过程创立快照代理,该存储过程指定公布名称和其余参数。
- 第 1 条命令: 用
-
add_subscription()
办法在订阅者数据库上创立订阅。此订阅指定订阅者将接管的公布,承受两个参数sub_name
和pub_name
,别离示意订阅的名称和公布的名称。而后用pyodbc cursor
执行 2 条 SQL 命令。- 第 1 条命令: 用
sp_addsubscription
存储过程向公布增加订阅,该存储过程指定公布名称、订阅者数据库和其余参数。 - 第 2 条命令: 用
sp_addsubscription_agent
存储过程创立订阅代理,该存储过程指定公布名称、订阅数据库和其余参数。
- 第 1 条命令: 用
-
start()
办法通过启动快照代理和散发代理开始复制过程,不须要任何参数,用pyodbc cursor
执行 2 条 SQL 命令。- 第 1 条命令: 用
sp_startpublication_snapshot
存储过程启动快照代理,指定公布名称。 - 第 2 条命令: 用
sp_startpublication_agent
存储过程启动散发代理,指定公布名称和其余参数。
- 第 1 条命令: 用
长处
- 在数据库之间提供近乎实时的数据同步
- 通过放弃多个数据库同步,反对高可用性和劫难复原
- 容许多个订阅者从单个发布者接管更新
- 基于某些规范过滤数据,容许选择性复制数据
- 反对冲突检测和解决,容许主动或手动解决抵触
毛病
- 与其余复制办法相比,因为须要捕捉和流传单个事务,须要更多资源
- 与其余复制办法相比,设置和治理非常复杂
- 因为捕捉事务的开销,会在源数据库中引入提早
实用场景
- 零售业用来同步不同商店和在线渠道的最新库存状态
- 金融机构在多个分支机构和在线平台上放弃客户账户余额和交易同步
- 制作公司放弃多个工厂和配送核心的生产打算和库存程度同步
挑战
- 配置和治理事务复制很简单
- 网络提早和带宽限度可能会影响复制性能,特地是在长距离或跨不牢靠网络复制数据时
参考文献
Atomic Commit Protocol in Distributed System
Atomic Commitment: The Unscalability Protocol
What is Atomic Commit Protocols
Parallel Commits: An atomic commit protocol for globally distributed transactions
Parallel Commits: An Atomic Commit Protocol for Distributed Transactions
Parallel Commits for Transactions Using postgres_fdw on PostgreSQL 15
parallel commit in postgres fdw
Transactional replication
Transactional Replication – SQL Server
Tutorial: Configure Transactional Replication – SQL Server
SQL Server replication: Overview of components and topography
How to Set Up Transactional Replication
你好,我是俞凡,在 Motorola 做过研发,当初在 Mavenir 做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI 等技术始终保持着浓重的趣味,平时喜爱浏览、思考,置信继续学习、一生成长,欢送一起交流学习。\
微信公众号:DeepNoMind
本文由 mdnice 多平台公布