共计 17866 个字符,预计需要花费 45 分钟才能阅读完成。
作者:李鹏博
爱可生 DBA 团队成员,次要负责 MySQL 故障解决和 SQL 审核优化。对技术执着,为客户负责。
本文起源:原创投稿
- 爱可生开源社区出品,原创内容未经受权不得随便应用,转载请分割小编并注明起源。
MySQL 的 commit
命令提交事务时,外部会进行两阶段(Prepare 和 Commit)提交,这篇文章基于 MySQL 8.0.33 对 MySQL 的两阶段提交进行源码剖析,带你理解提交事务过程中都经验了什么。
一、Prepare 阶段
1. Binlog Prepare
获取上一个事务最大的 sequence number
工夫戳。
2. InnoDB Prepare
- 事务状态设置为
prepared
; - 开释 RC 及以下隔离级别的 GAP Lock;
- 将
Undo log
segment 的状态从TRX_UNDO_ACTIVE
批改为TRX_UNDO_PREPARED
; Undo log
写入事务 XID。
二、Commit 阶段
1. Stage 0
保障从实例的 commit order。
2. Flush Stage
-
依据
innodb_flush_log_at_trx_commit
参数进行redo log
的刷盘操作- 获取并清空
BINLOG_FLUSH_STAGE
和COMMIT_ORDER_FLUSH_STAGE
队列 - 存储引擎层将
prepare
状态的redo log
依据innodb_flush_log_at_trx_commit
参数刷盘 - 不再阻塞 slave 的 preserve commit order 的执行
- 获取并清空
- 调用
get_server_sidno()
和Gtid_state::get_automatic_gno()
生成 GTID -
Flush binlog_cache_mngr
- Flush stmt_cache
-
Flush trx_cache
- 生成 last_committed 和 sequence_number
- flush GTID log event
- 将 trx_cache 中的数据 flush 到 binlog cache 中
- 筹备提交事务后的 Binlog pos
- 递增 prepread XID
- 插桩调用 after_flush,将曾经 flush 的 binlog file 和 position 注册到半同步复制插件中
- 如果
sync_binlog!=1
,在 flush stage 更新 Binlog 位点,并播送 update 信号,从库的 Dump 线程能够由此感知 Binlog 的更新
3. Sync Stage
- 依据 sync_binlog 的参数设置进行刷盘前的期待并调用 fsync() 进行刷盘
- 如果 sync_binlog==1,在 sync stage 阶段更新 binog 位点,并播送 update 信号,从库的 Dump 线程能够由此感知 Binlog 的更新
4. Commit Stage
- after_sync hook(半同步复制 after_sync 的钩子)
- 更新全局的 m_max_committed_transaction(用作后续事务的 last_committed),并初始化事务上下文的 sequence number
- Binlog 层提交,什么也不做
-
存储引擎层提交
- 为长久化 GTID 提前调配 update undo segment
- 更新数据字典中被批改表的 update_time 工夫
- 调配 Mini-transaction handle 和 buffer
-
更新 undo 状态
- 对于 insert 状态从 TRX_UNDO_ACTIVE 批改为 TRX_UNDO_TO_FREE,update 批改为 TRX_UNDO_TO_PURGE - 如果事务为 update 还须要将 rollback segments 调配 trx no,并将其增加到 purge 队列中 - 将 update undo log header 增加到 history list 结尾开释一些内存对象
- 在零碎事务表记录 binlog 位点
- 敞开 mvcc read view
- 长久化 GTID
- 开释 insert undo log
- 唤醒后盾线程开始干活,如 master thread、purge thread、page_cleaner
- 更新整组事务的 executed_gtid
- 在存储引擎层提交之后,递加 Prepared 状态下的 XID 计数器
- after_sync hook(半同步复制 after_commit 的钩子)
- 播送
m_stage_cond_binlog
信号变量,唤醒挂起的 follower
ha_commit_trans
函数次要判断是否须要写入 GTID 信息,并开始两阶段提交:
int ha_commit_trans(THD *thd, bool all, bool ignore_global_read_lock) { | |
/* | |
Save transaction owned gtid into table before transaction prepare | |
if binlog is disabled, or binlog is enabled and log_replica_updates | |
is disabled with slave SQL thread or slave worker thread. | |
*/ | |
std::tie(error, need_clear_owned_gtid) = commit_owned_gtids(thd, all); | |
... | |
// Prepare 阶段 | |
if (!trn_ctx->no_2pc(trx_scope) && (trn_ctx->rw_ha_count(trx_scope) > 1)) | |
error = tc_log->prepare(thd, all); | |
... | |
// Commit 阶段 | |
if (error || (error = tc_log->commit(thd, all))) {ha_rollback_trans(thd, all); | |
error = 1; | |
goto end; | |
} | |
} |
Prepare 阶段
两阶段提交的 Prepare 阶段绝对简略,以下是 commit
命令入口及 Prepare 阶段的堆栈和相干作用:
|mysql_execute_command | |
|--trans_commit | |
|----ha_commit_trans | |
|------MYSQL_BIN_LOG::prepare | |
// 开启 binlog prepare 和 innodb prepare | |
|--------ha_prepare_low | |
// Binlog prepare:获取上一个事务最大的 sequence number 工夫戳 | |
|----------binlog_prepare | |
// innodb prepare | |
|----------innobase_xa_prepare | |
|------------trx_prepare_for_mysql | |
// 1. 调用 trx_prepare_low | |
// 2. 事务状态设置为 Prepared | |
// 3. 开释 RC 及以下隔离级别的 GAP Lock | |
// 4. 刷盘 Redo(已推延到 Commit 阶段的 Flush stage)|--------------trx_prepare | |
|----------------trx_prepare_low | |
// 1. 将 undo log segment 的状态从 TRX_UNDO_ACTIVE 批改为 TRX_UNDO_PREPARED | |
// 2. undo log 写入事务 XID | |
|------------------trx_undo_set_state_at_prepare |
Commit 阶段
Commit 阶段的性能实现次要集中在 MYSQL_BIN_LOG::ordered_commit
函数中。
Flush 阶段
首先看下 Stage 0 和 Stage 1,stage 0 次要是 8.0 新增的一个阶段,次要是针对从库保障 commit order。stage 1 就是大家耳熟能详的 Commit 阶段的三个小阶段其一的 Flush 阶段了:
int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) { | |
/* | |
Stage #0: 保障从实例的 SQL 线程依照 Relay log 的事务程序进行提交 | |
*/ | |
if (Commit_order_manager::wait_for_its_turn_before_flush_stage(thd) || | |
ending_trans(thd, all) || | |
Commit_order_manager::get_rollback_status(thd)) {if (Commit_order_manager::wait(thd)) {return thd->commit_error;} | |
} | |
/* | |
Stage #1: flushing transactions to binary log | |
While flushing, we allow new threads to enter and will process | |
them in due time. Once the queue was empty, we cannot reap | |
anything more since it is possible that a thread entered and | |
appointed itself leader for the flush phase. | |
*/ | |
if (change_stage(thd, Commit_stage_manager::BINLOG_FLUSH_STAGE, thd, nullptr, | |
&LOCK_log)) {DBUG_PRINT("return", ("Thread ID: %u, commit_error: %d", thd->thread_id(), | |
thd->commit_error)); | |
return finish_commit(thd); | |
} | |
THD *wait_queue = nullptr, *final_queue = nullptr; | |
mysql_mutex_t *leave_mutex_before_commit_stage = nullptr; | |
my_off_t flush_end_pos = 0; | |
bool update_binlog_end_pos_after_sync; | |
// Flush 阶段次要的解决逻辑 | |
flush_error = | |
process_flush_stage_queue(&total_bytes, &do_rotate, &wait_queue); | |
if (flush_error == 0 && total_bytes > 0) | |
/* | |
flush binlog cache 到 file cache | |
*/ | |
flush_error = flush_cache_to_file(&flush_end_pos); | |
// 前面依据 sync_binlog 参数决定更新 binlog pos 的地位并播送 Binlog 更新信号 | |
update_binlog_end_pos_after_sync = (get_sync_period() == 1); | |
/* | |
If the flush finished successfully, we can call the after_flush | |
hook. Being invoked here, we have the guarantee that the hook is | |
executed before the before/after_send_hooks on the dump thread | |
preventing race conditions among these plug-ins. | |
*/ | |
if (flush_error == 0) {const char *file_name_ptr = log_file_name + dirname_length(log_file_name); | |
assert(flush_end_pos != 0); | |
/* | |
插桩调用 after_flush,将曾经 flush 的 binlog file 和 position 注册到半同步复制插件中,用于后续比照 slave 应答承受到的 binlog position。*/ | |
if (RUN_HOOK(binlog_storage, after_flush, | |
(thd, file_name_ptr, flush_end_pos))) {LogErr(ERROR_LEVEL, ER_BINLOG_FAILED_TO_RUN_AFTER_FLUSH_HOOK); | |
flush_error = ER_ERROR_ON_WRITE; | |
} | |
// 如果 sync_binlog!=1,在 flush stage 更新 binlog 位点并播送 update 信号,从库的 Dump 线程能够由此感知 Binlog 的更新 | |
if (!update_binlog_end_pos_after_sync) update_binlog_end_pos();} |
Flush stage 的次要解决逻辑集中在 process_flush_stage_queue
:
int MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var, | |
bool *rotate_var, | |
THD **out_queue_var) { | |
int no_flushes = 0; | |
my_off_t total_bytes = 0; | |
mysql_mutex_assert_owner(&LOCK_log); | |
// 依据 innodb_flush_log_at_trx_commit 参数进行 redo log 的刷盘操作 | |
THD *first_seen = fetch_and_process_flush_stage_queue(); | |
// 调用 get_server_sidno() 和 Gtid_state::get_automatic_gno 生成 GTID | |
assign_automatic_gtids_to_flush_group(first_seen); | |
/* Flush thread caches to binary log. */ | |
for (THD *head = first_seen; head; head = head->next_to_commit) {Thd_backup_and_restore switch_thd(current_thd, head); | |
/* | |
flush binlog_cache_mngr 的 stmt_cache 和 trx_cache。flush trx_cache:- 生成 last_committed 和 sequence_number | |
- flush GTID log event | |
- 将 trx_cache 中的数据 flush 到 binlog cache 中 | |
- 筹备提交事务后的 Binlog pos | |
- 递增 prepread XID | |
*/ | |
std::pair<int, my_off_t> result = flush_thread_caches(head); | |
total_bytes += result.second; | |
if (flush_error == 1) flush_error = result.first; | |
#ifndef NDEBUG | |
no_flushes++; | |
#endif | |
} | |
*out_queue_var = first_seen; | |
*total_bytes_var = total_bytes; | |
if (total_bytes > 0 && | |
(m_binlog_file->get_real_file_size() >= (my_off_t)max_size || | |
DBUG_EVALUATE_IF("simulate_max_binlog_size", true, false))) | |
*rotate_var = true; | |
#ifndef NDEBUG | |
DBUG_PRINT("info", ("no_flushes:= %d", no_flushes)); | |
no_flushes = 0; | |
#endif | |
return flush_error; | |
} |
redo log 刷盘的堆栈如下:
// 获取并清空 BINLOG_FLUSH_STAGE 和 COMMIT_ORDER_FLUSH_STAGE 队列,flush 事务到磁盘;不再阻塞 slave 的 preserve commit order 的执行 | |
|fetch_and_process_flush_stage_queue | |
// 存储引擎层将 prepare 状态的 redo log 依据 innodb_flush_log_at_trx_commit 参数刷盘 | |
|--ha_flush_logs | |
|----innobase_flush_logs | |
|------log_buffer_flush_to_disk |
SYNC 阶段
Sync 阶段的代码如下:
/* | |
Stage #2: Syncing binary log file to disk | |
*/ | |
if (change_stage(thd, Commit_stage_manager::SYNC_STAGE, wait_queue, &LOCK_log, | |
&LOCK_sync)) {DBUG_PRINT("return", ("Thread ID: %u, commit_error: %d", thd->thread_id(), | |
thd->commit_error)); | |
return finish_commit(thd); | |
} | |
/* | |
- sync_counter:commit group 的数量 | |
- get_sync_period():获取 sync_binlog 参数的值 | |
- 如果 sync stage 队列中的 commit group 大于等于 sync_binlog 的值,以后 leader 就调用 fsync()进行刷盘操作(sync_binlog_file(false)),在 sync 之前可能会进行期待,期待更多的 commit group 入队,期待的工夫为 binlog_group_commit_sync_no_delay_count 或 binlog_group_commit_sync_delay,默认都为 0。- 如果 sync stage 队列中的 commit group 小于 sync_binlog 的值,以后 leader 不会调用 fsync()进行刷盘也不会期待 | |
- 如果 sync_binlog 为 0,每个 commit group 都会触发期待动作,然而不会 sync | |
- 如果 sync_binlog 为 1,每个 commit group 都会触发期待动作,且会 sync | |
*/ | |
if (!flush_error && (sync_counter + 1 >= get_sync_period())) | |
Commit_stage_manager::get_instance().wait_count_or_timeout( | |
opt_binlog_group_commit_sync_no_delay_count, | |
opt_binlog_group_commit_sync_delay, Commit_stage_manager::SYNC_STAGE); | |
final_queue = Commit_stage_manager::get_instance().fetch_queue_acquire_lock(Commit_stage_manager::SYNC_STAGE); | |
if (flush_error == 0 && total_bytes > 0) {DEBUG_SYNC(thd, "before_sync_binlog_file"); | |
std::pair<bool, bool> result = sync_binlog_file(false); | |
sync_error = result.first; | |
} | |
/* | |
如果 sync_binlog==1, 在 sync stage 阶段更新 binog 位点,并播送 update 信号,从库的 Dump 线程能够由此感知 Binlog 的更新(位点在 flush stage 中的 process_flush_stage_queue() | |
|--flush_thread_caches() | |
|-----set_trans_pos()函数中设置)*/ | |
if (update_binlog_end_pos_after_sync && flush_error == 0 && sync_error == 0) { | |
THD *tmp_thd = final_queue; | |
const char *binlog_file = nullptr; | |
my_off_t pos = 0; | |
while (tmp_thd != nullptr) {if (tmp_thd->commit_error == THD::CE_NONE) {tmp_thd->get_trans_fixed_pos(&binlog_file, &pos); | |
} | |
tmp_thd = tmp_thd->next_to_commit; | |
} | |
if (binlog_file != nullptr && pos > 0) {update_binlog_end_pos(binlog_file, pos); | |
} | |
} | |
DEBUG_SYNC(thd, "bgc_after_sync_stage_before_commit_stage"); | |
leave_mutex_before_commit_stage = &LOCK_sync; |
COMMIT 阶段
Commit 阶段的代码如下:
/* | |
Stage #3: Commit all transactions in order. | |
*/ | |
commit_stage: | |
/* binlog_order_commits:是否进行 order commit,即放弃 redo 和 binlog 的提交程序统一 */ | |
if ((opt_binlog_order_commits || Clone_handler::need_commit_order()) && | |
(sync_error == 0 || binlog_error_action != ABORT_SERVER)) { | |
if (change_stage(thd, Commit_stage_manager::COMMIT_STAGE, final_queue, | |
leave_mutex_before_commit_stage, &LOCK_commit)) {DBUG_PRINT("return", ("Thread ID: %u, commit_error: %d", thd->thread_id(), | |
thd->commit_error)); | |
return finish_commit(thd); | |
} | |
THD *commit_queue = | |
Commit_stage_manager::get_instance().fetch_queue_acquire_lock(Commit_stage_manager::COMMIT_STAGE); | |
DBUG_EXECUTE_IF("semi_sync_3-way_deadlock", | |
DEBUG_SYNC(thd, "before_process_commit_stage_queue");); | |
if (flush_error == 0 && sync_error == 0) | |
/* after_sync hook */ | |
sync_error = call_after_sync_hook(commit_queue); | |
/* | |
Commit 阶段的次要解决逻辑 | |
*/ | |
process_commit_stage_queue(thd, commit_queue); | |
/** | |
* After commit stage | |
*/ | |
if (change_stage(thd, Commit_stage_manager::AFTER_COMMIT_STAGE, | |
commit_queue, &LOCK_commit, &LOCK_after_commit)) {DBUG_PRINT("return", ("Thread ID: %u, commit_error: %d", thd->thread_id(), | |
thd->commit_error)); | |
return finish_commit(thd); | |
} | |
THD *after_commit_queue = | |
Commit_stage_manager::get_instance().fetch_queue_acquire_lock(Commit_stage_manager::AFTER_COMMIT_STAGE); | |
/* after_commit hook */ | |
process_after_commit_stage_queue(thd, after_commit_queue); | |
final_queue = after_commit_queue; | |
mysql_mutex_unlock(&LOCK_after_commit); | |
} else {if (leave_mutex_before_commit_stage) | |
mysql_mutex_unlock(leave_mutex_before_commit_stage); | |
if (flush_error == 0 && sync_error == 0) | |
sync_error = call_after_sync_hook(final_queue); | |
} | |
/* 播送 m_stage_cond_binlog 信号变量,唤醒挂起的 follower */ | |
Commit_stage_manager::get_instance().signal_done(final_queue); | |
DBUG_EXECUTE_IF("block_leader_after_delete", {const char action[] = "now SIGNAL leader_proceed"; | |
assert(!debug_sync_set_action(thd, STRING_WITH_LEN(action))); | |
};); | |
/* | |
Finish the commit before executing a rotate, or run the risk of a | |
deadlock. We don't need the return value here since it is in | |
thd->commit_error, which is returned below. | |
*/ | |
(void)finish_commit(thd); | |
DEBUG_SYNC(thd, "bgc_after_commit_stage_before_rotation"); | |
return thd->commit_error == THD::CE_COMMIT_ERROR; | |
} |
Commit 阶段的次要解决逻辑集中在 process_commit_stage_queue
函数中:
void MYSQL_BIN_LOG::process_commit_stage_queue(THD *thd, THD *first) {mysql_mutex_assert_owner(&LOCK_commit); | |
#ifndef NDEBUG | |
thd->get_transaction()->m_flags.ready_preempt = | |
true; // formality by the leader | |
#endif | |
for (THD *head = first; head; head = head->next_to_commit) { | |
DBUG_PRINT("debug", ("Thread ID: %u, commit_error: %d, commit_pending: %s", | |
head->thread_id(), head->commit_error, | |
YESNO(head->tx_commit_pending))); | |
DBUG_EXECUTE_IF( | |
"block_leader_after_delete", | |
if (thd != head) {DBUG_SET("+d,after_delete_wait"); };); | |
/* | |
If flushing failed, set commit_error for the session, skip the | |
transaction and proceed with the next transaction instead. This | |
will mark all threads as failed, since the flush failed. | |
If flush succeeded, attach to the session and commit it in the | |
engines. | |
*/ | |
#ifndef NDEBUG | |
Commit_stage_manager::get_instance().clear_preempt_status(head); | |
#endif | |
/* | |
更新全局的 m_max_committed_transaction(用作后续事务的 last_committed),并初始本事务上下文的 sequence number | |
*/ | |
if (head->get_transaction()->sequence_number != SEQ_UNINIT) {mysql_mutex_lock(&LOCK_replica_trans_dep_tracker); | |
m_dependency_tracker.update_max_committed(head); | |
mysql_mutex_unlock(&LOCK_replica_trans_dep_tracker); | |
} | |
/* | |
Flush/Sync error should be ignored and continue | |
to commit phase. And thd->commit_error cannot be | |
COMMIT_ERROR at this moment. | |
*/ | |
assert(head->commit_error != THD::CE_COMMIT_ERROR); | |
Thd_backup_and_restore switch_thd(thd, head); | |
bool all = head->get_transaction()->m_flags.real_commit; | |
assert(!head->get_transaction()->m_flags.commit_low || | |
head->get_transaction()->m_flags.ready_preempt);<br> // Binlog Commit、Innodb Commit | |
::finish_transaction_in_engines(head, all, false); | |
DBUG_PRINT("debug", ("commit_error: %d, commit_pending: %s", | |
head->commit_error, YESNO(head->tx_commit_pending))); | |
} | |
/* | |
锁定 sidno,更新整组事务 的 executed_gtid | |
- 如果没开启 binlog,@@GLOBAL.GTID_PURGED 的值是从 executed_gtid 获取的,此时 @@GLOBAL.GTID_PURGED 的值和 @@GLOBAL.GTID_EXECUTED 永远是统一的,就不须要在记录 lost_gtids | |
- 如果开启了 binlog,然而未开启 log_replica_updates,slave 的 SQL 线程或 slave worker 线程 | |
将本身的 GTID 更新到 executed_gtids、lost_gtids | |
*/ | |
gtid_state->update_commit_group(first); | |
for (THD *head = first; head; head = head->next_to_commit) {Thd_backup_and_restore switch_thd(thd, head); | |
auto all = head->get_transaction()->m_flags.real_commit; | |
// 只针对内部 XA 事务,在存储引擎层将事务标记为 Prepared | |
trx_coordinator::set_prepared_in_tc_in_engines(head, all); | |
/* | |
在存储引擎层提交之后,递加 Prepared 状态下的 XID 计数器 | |
*/ | |
if (head->get_transaction()->m_flags.xid_written) dec_prep_xids(head); | |
} | |
} |
其中::finish_transaction_in_engines 函数是次要的存储引擎层提交逻辑,相干堆栈如下:
|::finish_transaction_in_engines | |
|--trx_coordinator::commit_in_engines | |
|----ha_commit_low | |
// Binlog 层提交什么也不做(空函数)|------binlog_commit | |
// 存储引擎层提交 | |
|------innobase_commit | |
|--------innobase_commit_low | |
|----------trx_commit_for_mysql | |
// 为长久化 GTID 提前调配 update undo segment | |
|------------trx_undo_gtid_add_update_undo | |
// 更新数据字典中被批改表的 update_time 工夫 | |
|------------trx_update_mod_tables_timestamp | |
// 调配 Mini-transaction handle 和 buffer | |
|------------trx_commit | |
// 提交 mini-transaction | |
|--------------trx_commit_low | |
|----------------trx_write_serialisation_history | |
// 更新 undo 状态:// 对于 insert 状态从 TRX_UNDO_ACTIVE 批改为 TRX_UNDO_TO_FREE | |
// update 批改为 TRX_UNDO_TO_PURGE | |
// 如果事务为 update 还须要将 rollback segments 调配 trx no,并将其增加到 purge 队列中 | |
|------------------trx_undo_set_state_at_finish | |
// 将 update undo log header 增加到 history list 结尾开释一些内存对象; | |
|------------------trx_undo_update_cleanup | |
// 在零碎事务表记录 binlog 位点 | |
|------------------trx_sys_update_mysql_binlog_offset | |
|----------------trx_commit_in_memory | |
//- 敞开 mvcc read view | |
//- 长久化 GTID | |
//- 开释 insert undo log | |
//- 唤醒后盾线程开始干活,如:master thread、purge thread、page_cleaner |
阶段转换
阶段转换的逻辑次要是由 change_stage
中的 enroll_for
函数实现:
- 进入队列的第一个线程会作为整组事务的 leader
- 后续进入队列的线程会作为整组事务的 follower
- follower 线程挂起期待 m_stage_cond_binlog 信号变量唤醒
- leader 负责提交整组事务,提交实现后,发送 m_stage_cond_binlog 信号变量唤醒挂起的 follower
- 队列转化的次要逻辑是线程先入下个阶段的队列,而后再开释上一个阶段的 mutex,而后再获取下一个阶段的 mutex
- Flush Stage 不会获取 mutex
- Sync Stage 须要获取 LOCK_sync
- Commit Stage 须要获取 LOCK_commit mutex
- After Commit Stage 须要获取 LOCK_after_commit mutex
bool Commit_stage_manager::enroll_for(StageID stage, THD *thd, | |
mysql_mutex_t *stage_mutex, | |
mysql_mutex_t *enter_mutex) { | |
// 如果队列为空,线程就是 leader | |
thd->rpl_thd_ctx.binlog_group_commit_ctx().assign_ticket(); | |
bool leader = this->append_to(stage, thd); | |
/* | |
如果 FLUSH stage 队列((BINLOG_FLUSH_STAGE 或 COMMIT_ORDER_FLUSH_STAGE)不为空,此线程就不能成为 leader。leader | |
须要获取 enter_mutex | |
*/ | |
if (leader) {if (stage == COMMIT_ORDER_FLUSH_STAGE) {leader = m_queue[BINLOG_FLUSH_STAGE].is_empty(); | |
/* | |
leader 转换的逻辑。session 的队列有 5 种:- Binlog flush queue: flush redo 并写 Binlog File | |
- Commit order flush queue: 针对 commit order 的事务,然而会参加 group commit 的结尾局部,直到引擎层的 flush。- Sync queue: sync transaction | |
- Commit queue: 提交事务 | |
- After commit queue: 调用事务的 after_commit hook | |
*/ | |
} else if (stage == BINLOG_FLUSH_STAGE && // 以后线程是 BINLOG_FLUSH_STAGE 中的第一个线程;然而 COMMIT_ORDER_FLUSH_STAGE | |
// 曾经有了 leader,此时以后线程会挂起,期待 COMMIT_ORDER_FLUSH_STAGE 的 leader 的信号唤醒 | |
!m_queue[COMMIT_ORDER_FLUSH_STAGE].is_empty()) { | |
/* | |
以后事务是 binlog queue 中的第一个线程,然而在 commit order queue 中曾经有了一个 leader。此时以后线程会作为 leader,而 commit order leader 会转变为 follower。扭转 leader 的起因是 commit order leader 不能作为 binlog 线程的 leader,因为 commit order threads | |
必须在 binlog threads 操作完之前来到 commit group。转变 leader 为 followers 的步骤如下:1. commit order thread 首先进入 flush stage,并成为 commit order leader。2. commit order leader 尝试获取 stage mutex,这可能会须要一些工夫,比方 mutex 曾经被上一个 | |
commit group 的 leader 获取。3. 在此期间,一个 binlog 线程进入了 flush stage。它须要期待来自 commit order leader 的信号。4. commit order leader 获取了 stage mutex,而后它会查看是否有 binlog thread 进入了 flush stage,如果发现了就转变 leader。5. commit order leader 给 binlog leader 发送一个信号,并成为 follower,期待 commit 的实现(和其余 follower 的行为统一)。6. binlog leader 被 commit order leader 的信号唤醒并执行 group commit。*/ | |
CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("before_binlog_leader_wait"); | |
while (thd->tx_commit_pending) | |
mysql_cond_wait(&m_stage_cond_leader, | |
&m_queue_lock[BINLOG_FLUSH_STAGE]); | |
} | |
} | |
unlock_queue(stage); | |
/* | |
告诉下一个组提交事务进入队列 | |
*/ | |
if (stage == BINLOG_FLUSH_STAGE) {Commit_order_manager::finish_one(thd); | |
CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("after_binlog_leader_wait"); | |
} else if (stage == COMMIT_ORDER_FLUSH_STAGE) {Commit_order_manager::finish_one(thd); | |
} | |
/* | |
当进入第一个 stage 时,能够不必获取 stage mutex | |
*/ | |
if (stage_mutex && need_unlock_stage_mutex) mysql_mutex_unlock(stage_mutex); | |
/* | |
如果队列非空,以后线程作为 follower 期待 leader 解决队列 | |
*/ | |
if (!leader) {CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("before_follower_wait"); | |
mysql_mutex_lock(&m_lock_done); | |
#ifndef NDEBUG | |
thd->get_transaction()->m_flags.ready_preempt = true; | |
if (leader_await_preempt_status) mysql_cond_signal(&m_cond_preempt); | |
#endif | |
// tx_commit_pending:还有事务 commit 操作未实现 | |
while (thd->tx_commit_pending) {if (stage == COMMIT_ORDER_FLUSH_STAGE) {mysql_cond_wait(&m_stage_cond_commit_order, &m_lock_done); | |
} else { | |
// follower 线程在此处挂起,期待 leader 提交事务实现后被唤醒 | |
mysql_cond_wait(&m_stage_cond_binlog, &m_lock_done); | |
} | |
} | |
mysql_mutex_unlock(&m_lock_done); | |
return false; | |
} | |
#ifndef NDEBUG | |
if (stage == Commit_stage_manager::SYNC_STAGE) | |
DEBUG_SYNC(thd, "bgc_between_flush_and_sync"); | |
#endif | |
bool need_lock_enter_mutex = false; | |
if (leader && enter_mutex != nullptr) { | |
/* | |
如果因为在轮替 Binlog 时曾经获取了 LOCK_log,就不在须要获取 enter_mutex。*/ | |
need_lock_enter_mutex = !(mysql_bin_log.is_rotating_caused_by_incident && | |
enter_mutex == mysql_bin_log.get_log_lock()); | |
if (need_lock_enter_mutex) | |
mysql_mutex_lock(enter_mutex); | |
else | |
mysql_mutex_assert_owner(enter_mutex); | |
} | |
// leader 转换的逻辑 | |
if (stage == COMMIT_ORDER_FLUSH_STAGE) { | |
CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("after_commit_order_thread_becomes_leader"); | |
lock_queue(stage); | |
if (!m_queue[BINLOG_FLUSH_STAGE].is_empty()) {if (need_lock_enter_mutex) mysql_mutex_unlock(enter_mutex); | |
THD *binlog_leader = m_queue[BINLOG_FLUSH_STAGE].get_leader(); | |
binlog_leader->tx_commit_pending = false; | |
mysql_cond_signal(&m_stage_cond_leader); | |
unlock_queue(stage); | |
mysql_mutex_lock(&m_lock_done); | |
/* wait for signal from binlog leader */ | |
CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("before_commit_order_leader_waits_for_binlog_leader"); | |
while (thd->tx_commit_pending) | |
mysql_cond_wait(&m_stage_cond_commit_order, &m_lock_done); | |
mysql_mutex_unlock(&m_lock_done); | |
leader = false; | |
return leader; | |
} | |
} | |
return leader; | |
} |