在分布式系统中,事务的解决散布在不同组件、服务中,因而分布式事务的 ACID 保障面临着一些非凡难点。本系列文章介绍了 21 种分布式事务设计模式,并剖析其实现原理和优缺点,在面对具体分布式事务问题时,能够抉择适合的模式进行解决。原文: Exploring Solutions for Distributed Transactions (3)
在不同业务场景下,能够有不同的解决方案,常见办法有:
- 阻塞重试(Blocking Retry)
- 二阶段和三阶段提交(Two-Phase Commit (2PC) and Three-Phase Commit (3PC))
- 基于后盾队列的异步解决(Using Queues to Process Asynchronously in the Background)
- TCC 弥补(TCC Compensation Matters)
- 本地音讯表(异步保障)/ 发件箱模式(Local Message Table (Asynchronously Ensured)/Outbox Pattern)
- MQ 事务(MQ Transaction)
- Saga 模式(Saga Pattern)
- 事件驱动(Event Sourcing)
- 命令查问职责拆散(Command Query Responsibility Segregation, CQRS)
- 原子提交(Atomic Commitment)
- 并行提交(Parallel Commits)
- 事务复制(Transactional Replication)
- 一致性算法(Consensus Algorithms)
- 工夫戳排序(Timestamp Ordering)
- 乐观并发管制(Optimistic Concurrency Control)
- 拜占庭容错(Byzantine Fault Tolerance, BFT)
- 分布式锁(Distributed Locking)
- 分片(Sharding)
- 多版本并发管制(Multi-Version Concurrency Control, MVCC)
- 分布式快照(Distributed Snapshots)
- 主从复制(Leader-Follower Replication)
本文将介绍 Saga、事件驱动以及 CQRS 三种模式。
7. Saga 模式(Saga Pattern)
- 治理跨多个微服务的长时间事务。
- 将事务合成为一系列较小的、独立的步骤,每个步骤都由独自的微服务治理。
-
蕴含如下步骤:
- 协调微服务负责接管事务初始申请。
- 协调微服务通过向第一个负责处理事务的微服务发送音讯来启动事务。
- 第一个微服务执行事务,并将音讯发送回协调微服务,反馈其是否胜利。
- 如果第一步胜利,协调微服务将向负责事务下一步的微服务发送音讯。
- 如果第一步失败,协调微服务发送弥补动作来吊销失败步骤的影响。
- 反复步骤 3 -5,直到每个微服务要么实现其步骤,要么在失败时触发弥补操作(回滚)。
- 一旦所有步骤都胜利实现,协调微服务就会发送一条音讯,表明整个事务曾经胜利。
- 如果任何步骤失败并且触发了弥补操作(回滚),则协调微服务将发送一条音讯,批示整个事务失败。
import pika
import json
# Define the RabbitMQ connection parameters
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
# Define the messages to be sent between services
start_order_message = {'order_id': '12345', 'items': [{'id': '1', 'name': 'item1'}, {'id': '2', 'name': 'item2'}]}
payment_message = {'order_id': '12345', 'amount': 100.0}
shipping_message = {'order_id': '12345', 'items': [{'id': '1', 'name': 'item1'}, {'id': '2', 'name': 'item2'}]}
# Define the compensation messages to be sent in case of failure
cancel_payment_message = {'order_id': '12345', 'amount': 100.0}
cancel_shipping_message = {'order_id': '12345', 'items': [{'id': '1', 'name': 'item1'}, {'id': '2', 'name': 'item2'}]}
# Define the function to send messages to the RabbitMQ broker
def send_message(queue_name, message):
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=queue_name)
channel.basic_publish(exchange='', routing_key=queue_name, body=json.dumps(message))
connection.close()
# Define the function to handle the start of the order
def start_order():
# Send the start order message to the Order service
send_message('start_order', start_order_message)
# Define the function to handle the payment of the order
def payment():
try:
# Send the payment message to the Payment service
send_message('payment', payment_message)
except Exception as e:
# Send the cancel payment message to the Payment service in case of failure
send_message('cancel_payment', cancel_payment_message)
raise e
# Define the function to handle the shipping of the order
def shipping():
try:
# Send the shipping message to the Shipping service
send_message('shipping', shipping_message)
except Exception as e:
# Send the cancel shipping message to the Shipping service in case of failure
send_message('cancel_shipping', cancel_shipping_message)
raise e
# Define the function to handle the cancellation of the order
def cancel_order():
# Send the cancel payment message to the Payment service
send_message('cancel_payment', cancel_payment_message)
# Send the cancel shipping message to the Shipping service
send_message('cancel_shipping', cancel_shipping_message)
# Define the main function to execute the Saga
def execute_saga():
try:
# Start the order
start_order()
# Perform the payment
payment()
# Perform the shipping
shipping()
except Exception as e:
# Cancel the order in case of failure
cancel_order()
raise e
# Call the main function to execute the Saga
execute_saga()
示例代码
- 用 RabbitMQ 作为简略音讯代理
- 定义了在服务之间发送的五条音讯:
start_order_message
、payment_message
、shipping_message
、cancel_payment_message
以及cancel_shipping_message
。 start_order
函数将start_order_message
发送给order_service
。- 在收到来自
start_order
函数的音讯后,order_service
创立订单,并发送回蕴含order_id
的确认音讯。 - 一旦
start_order
函数接管到确认音讯,将发送payment_message
给payment_service
来解决订单领取。 - 如果领取胜利,
payment_service
将返回一条蕴含payment_id
的确认音讯。 start_order
函数将shipping_message
发送给shipping_service
,以便在付款胜利后发货。- 如果发货胜利,
shipping_service
将返回一条蕴含shipping_id
的确认音讯。 - 如果上述任何步骤失败,则回滚事务,别离给
shipping_service
和payment_service
发送cancel_shipping_message
和cancel_payment_message
,撤销所做的更改。 - 通过向
start_order
发送初始音讯、监听确认音讯以及在产生故障时解决弥补来解决整个 Saga 流程。换句话说,Saga 模式波及一系列弥补操作,以便在产生故障时吊销事务的影响。
长处
- 治理跨多个微服务的长时间事务
- 防止服务独立运行时呈现不统一或数据损坏
- 如果事务中的某个步骤失败,则提供弥补操作
- 容许服务自主、独立运行
毛病
- 实现这种模式可能比较复杂
- 很难设计和测试弥补逻辑
- 会给零碎减少额定的复杂性,使保护和故障排除变得更加艰难
实用场景
- 波及多个服务 (如领取解决、订单履约、物流) 的电子商务交易
- 波及多个零碎和服务的简单金融交易
- 波及多个供应商、制造商和物流供应商的供应链管理系统
尝试 - 确认 - 勾销 (TCC) 模式与 Saga 模式的相似之处
- 两种模式都保护分布式事务中波及多个微服务的数据一致性
- 两种模式都要求每个服务定义一组须要作为事务一部分的操作
尝试 - 确认 - 勾销 (TCC) 模式与 Saga 模式的不同之处
- Saga 模式应用前向复原法,每个服务在呈现故障时启动一个弥补事务,而 TCC 模式应用后向复原法,每个服务验证事务是否能够持续,而后才确认或勾销。
- Saga 模式将事务示意为事件序列,事件由相干服务之间发送的音讯示意。TCC 模式将事务示意为由所波及的每个服务执行的操作序列。
- Saga 模式实用于波及多个服务的长时间事务,而 TCC 模式实用于波及较少服务的短时间事务。
- Saga 模式的实现可能比 TCC 模式更简单,要求每个服务可能发动弥补事务并解决潜在故障。
8. 事件驱动(Event Sourcing)
- 对应用程序状态所做的所有更改作为一系列事件。
- 将这些事件存储在数据库或事件日志中,从而提供应用程序状态随工夫变动的残缺审计跟踪。
-
波及如下步骤:
- 每当应用程序状态发生变化时,就会捕捉相应事件,事件蕴含所有更改相干信息(例如已批改的数据和进行更改的用户)。
- 事件存储在事件日志中,能够用数据库或音讯代理实现,每个事件都有一个惟一标识符,并带有工夫戳,以确保事件有序。
- 通过按工夫程序重播事件日志中的事件来重构应用程序以后状态。该过程包含将应用程序的状态初始化为其初始状态,而后顺次利用每个事件来更新状态。
- 一旦状态被重构,就能够对其进行查问,以提供无关应用程序以后状态的信息。
- 能够实时处理事件,触发其余动作或更新。
- 事件处理实现后,能够将其归档或删除以开释存储空间。
import uuid
import json
import time
class BankAccount:
def __init__(self):
self.balance = 0
self.event_sourcing = EventSourcing()
def deposit(self, amount):
event = Event('deposit', {'amount': amount})
self.event_sourcing.add_event(event)
self.balance += amount
def withdraw(self, amount):
if self.balance < amount:
raise ValueError('Insufficient balance')
event = Event('withdraw', {'amount': amount})
self.event_sourcing.add_event(event)
self.balance -= amount
def get_balance(self):
return self.balance
def get_events(self):
return self.event_sourcing.get_events()
def get_event_by_id(self, event_id):
return self.event_sourcing.get_event_by_id(event_id)
def replay_events(self):
self.balance = 0
for event in self.event_sourcing.get_events():
if event.type == 'deposit':
self.balance += event.data['amount']
elif event.type == 'withdraw':
self.balance -= event.data['amount']
class Event:
def __init__(self, type, data):
self.id = uuid.uuid4()
self.timestamp = int(time.time())
self.type = type
self.data = data
class EventSourcing:
def __init__(self):
self.event_store = EventStore()
def add_event(self, event):
self.event_store.store_event(event)
def get_events(self):
return self.event_store.get_events()
def get_event_by_id(self, event_id):
return self.event_store.get_event_by_id(event_id)
class EventStore:
def __init__(self):
self.events = []
def store_event(self, event):
self.events.append(event)
def get_events(self):
return self.events
def get_event_by_id(self, event_id):
for event in self.events:
if event.id == event_id:
return event
raise ValueError('Event not found')
class Auditor:
def __init__(self, event_store):
self.event_store = event_store
def log_events(self):
for event in self.event_store.get_events():
print(json.dumps({'id': str(event.id), 'type': event.type, 'data': event.data}))
account = BankAccount()
auditor = Auditor(account.event_sourcing.event_store)
account.deposit(100)
account.withdraw(50)
account.deposit(75)
print('Current balance:', account.get_balance())
print('All events:')
auditor.log_events()
event_id = account.get_events()[1].id
event = account.get_event_by_id(event_id)
print('Event details:', json.dumps({'id': str(event.id), 'type': event.type, 'data': event.data}))
示例代码
BankAccount
类用来演示如何应用事件来重建实体状态,蕴含balance
属性,反对deposit
(贷款) 和withdraw
(取款)两种操作。Event
类有类型和数据属性。EventSourcing
类定义了 BankAccount 的event_sourcing
属性EventSourcing
类蕴含event_store
属性作为事件列表。EventStore
类有两个次要办法:store_event()
(在列表中存储事件)和get_events()
(从列表中检索事件)。add_event
办法向event_store
增加新事件。get_events
和get_event_by_id
办法能够通过 ID 检索所有事件或特定事件。deposit
和withdraw
办法创立具备惟一 ID、工夫戳和字典 (蕴含操作信息,在本例中为操作类型和金额) 的新Event
对象,事件被增加到BankAccount
实例的event_sourcing
属性中。- 每次进行
deposit
和withdraw
时,都会创立相应事件,并通过EventStore
类将其存储在事件存储中。 get_balance
办法返回帐户以后余额。replay_events()
办法从事件存储中检索所有事件,并计算以后余额。遍历事件存储中的所有事件,并依据每个事件的类型和数据更新BankAccount
的balance
属性。Auditor
类监听存储在事件存储库中的所有事件,并在终端上输入相应 log。- 以 JSON 格局打印以后余额和所有事件,通过 ID 检索特定事件并打印其详细信息。
- 事件源模式是创立事件来示意对系统状态的更改,将这些事件存储在事件存储中,并重播事件以重建零碎以后状态。
长处
- 捕捉、存储所有事件,以便进行审计和保障一致性
- 能够重播所有事件以创立数据的不同视图
- 通过存储事件来解决大量数据
- 能够重播事件以将零碎复原到以前的状态
- 事件日志能够作为一段时间内零碎行为的文档,使其更容易保护和扩大。
毛病
- 实现可能比较复杂,特地是在处理事件设计和数据迁徙的复杂性时
- 存储所有事件须要更多存储空间
- 必须重放所有事件以确定以后状态,可能导致额定性能开销
实用场景
- 记录交易和财务事项
- 记录衰弱事件和医疗程序
- 记录订单事件和付款事件
注意事项
- 事件设计 —— 以细粒度形式捕获零碎状态的变动。事件应该是不可变的,这意味着事件被创立后不能被批改。事件的设计应该反对简略的查问和剖析。
- 存储需要 —— 所有对系统状态的更改都以事件序列的模式存储。存储空间显著大于传统数据库。
- 数据迁徙 —— 提前打算数据迁徙,并思考如何将数据从旧零碎迁徙到新的事件源零碎。
9. 命令查问职责拆散(Command Query Responsibility Segregation, CQRS)
- 将读写操作拆散到独自的服务或模型中
- 命令: 扭转零碎状态
- 查问: 返回数据
-
波及如下步骤:
- 用户向零碎收回读取或写入数据的申请
- 如果申请是命令(写操作),则将该命令发送给命令服务,命令服务解决申请并更新零碎状态。
- 命令服务更新写入模型,其中蕴含零碎的以后状态,并创立形容更改的事件。事件被增加到事件流中,事件流是零碎中产生的所有事件的日志。
- 命令服务将事件公布到音讯代理,音讯代理将事件传递给感兴趣的订阅者。
- 如果申请是查问(读操作),则将查问发送给查问服务,查问服务从读模型中检索数据。
- 查问服务从读模型中检索数据并将其返回给用户。
- 如果用户想执行另一个写操作,则从步骤 2 开始反复该过程。
- 如果用户想要执行读操作,则从步骤 5 开始反复该过程。
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
class Command(ABC):
pass
class CreateProductCommand(Command):
def __init__(self, name: str, price: float):
self.name = name
self.price = price
class UpdateProductCommand(Command):
def __init__(self, product_id: str, name: Optional[str] = None, price: Optional[float] = None):
self.product_id = product_id
self.name = name
self.price = price
class DeleteProductCommand(Command):
def __init__(self, product_id: str):
self.product_id = product_id
class Query(ABC):
pass
class GetProductQuery(Query):
def __init__(self, product_id: str):
self.product_id = product_id
class GetAllProductsQuery(Query):
pass
class Product:
def __init__(self, id: str, name: str, price: float):
self.id = id
self.name = name
self.price = price
class ProductRepository:
def __init__(self):
self.products = []
def create(self, name: str, price: float) -> Product:
product = Product(str(len(self.products) + 1), name, price)
self.products.append(product)
return product
def get(self, id: str) -> Optional[Product]:
for product in self.products:
if product.id == id:
return product
return None
def get_all(self) -> List[Product]:
return self.products
def update(self, id: str, name: Optional[str] = None, price: Optional[float] = None) -> Optional[Product]:
for product in self.products:
if product.id == id:
if name is not None:
product.name = name
if price is not None:
product.price = price
return product
return None
def delete(self, id: str) -> bool:
for product in self.products:
if product.id == id:
self.products.remove(product)
return True
return False
class ProductCommandHandler:
def __init__(self, repository: ProductRepository):
self.repository = repository
def handle(self, command: Command) -> Optional[Product]:
if isinstance(command, CreateProductCommand):
return self.repository.create(command.name, command.price)
elif isinstance(command, UpdateProductCommand):
return self.repository.update(command.product_id, command.name, command.price)
elif isinstance(command, DeleteProductCommand):
success = self.repository.delete(command.product_id)
return success
class ProductQueryHandler:
def __init__(self, repository: ProductRepository):
self.repository = repository
def handle(self, query: Query) -> Optional[Any]:
if isinstance(query, GetProductQuery):
return self.repository.get(query.product_id)
elif isinstance(query, GetAllProductsQuery):
return self.repository.get_all()
class ProductService:
def __init__(self, command_handler: ProductCommandHandler, query_handler: ProductQueryHandler):
self.command_handler = command_handler
self.query_handler = query_handler
def create_product(self, name: str, price: float) -> Product:
command = CreateProductCommand(name, price)
return self.command_handler.handle(command)
def get_product(self, id: str) -> Optional[Product]:
query = GetProductQuery(id)
return self.query_handler.handle(query)
def get_all_products(self) -> List[Product]:
query = GetAllProductsQuery()
return self
示例代码
- 一个产品管理系统。
- 抽象类
Command
和Query
别离由具体类实现。 - 3 个命令类的实现:
CreateProductCommand
、UpdateProductCommand
、DeleteProductCommand
CreateProductCommand
创立产品UpdateProductCommand
更新产品DeleteProductCommand
删除产品- 2 个查问类的实现:
GetProductQuery
和GetAllProductQuery
GetProductQuery
检索对于特定产品的信息GetAllProductQuery
检索所有产品的信息Product
类示意一个产品,蕴含id
、name
和price
ProductRepository
类解决产品数据的持久性,具备创立、检索、更新和删除产品的办法ProductCommandHandler
类解决命令并将ProductRepository
作为依赖项ProductQueryHandler
类解决查问并将ProductRepository
作为依赖项- 两个
handle
办法负责接受命令或查问,并返回适当的响应 ProductService
类作为客户端与产品管理系统交互的入口,将ProductCommandHandler
和ProductQueryHandler
作为依赖项,并公开用于创立、检索和列出产品的办法,这些办法只是对适当命令或查问的包装,并将其传递给相应的处理程序。
长处
- 拆散读写模型以别离优化两个过程,能够取得更好的性能。
- 拆散读写模型使代码更容易保护。
- 此模式可针对特定用例进行优化。
毛病
- 零碎的设计和实现比较复杂
- 比传统的整体架构须要更多工夫和资源
- 在产生写操作和更新读模型之间可能存在提早,从而导致只能保障最终一致性
实用场景
- 具备简单域逻辑和高读写负载的应用程序
- 具备不同用户界面的零碎,例如挪动和 web 应用程序,其中每个界面都有本人特定的读取要求
- 电子商务系统中的产品目录治理和订单治理
- 医疗保健零碎中的患者数据检索和数据输出
- 金融交易零碎中的实时市场数据检索和订单执行
CQRS 和事件驱动的联合
参考文献
Saga pattern
Microservices Pattern: Sagas
Saga Pattern
Saga Pattern Microservices
Saga Pattern for Microservices Distributed Transactions
Microservice Design Pattern – Saga
Event Driven Saga Pattern
How to Use Saga Pattern in Microservices
Saga Orchestration for Microservices Using the Outbox Pattern
Saga Without the Headaches
Event Sourcing
Event Sourcing – why a dedicated event store?
Beginner’s Guide to Event Sourcing
Microservices Pattern: Event Sourcing
Event Sourcing pattern
Event Sourcing
Event Sourcing explained
CQRS Event Sourcing JAVA
Introduction to CQRS
CQRS Pattern
bliki: CQRS
Microservices Pattern: Command Query Responsibility Segregation (CQRS)
A Beginner’s Guide to CQRS
CQRS Desgin Pattern in Microservices Architecture
The Command and Query Responsibility Segregation(CQRS)
Event Driven CQRS Pattern
CQRS Pattern
CQRS Software Architecture Pattern: The Good, the Bad, and the Ugly
你好,我是俞凡,在 Motorola 做过研发,当初在 Mavenir 做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI 等技术始终保持着浓重的趣味,平时喜爱浏览、思考,置信继续学习、一生成长,欢送一起交流学习。\
微信公众号:DeepNoMind
本文由 mdnice 多平台公布