共计 17866 个字符,预计需要花费 45 分钟才能阅读完成。
作者:李鹏博
爱可生 DBA 团队成员,次要负责 MySQL 故障解决和 SQL 审核优化。对技术执着,为客户负责。
本文起源:原创投稿
- 爱可生开源社区出品,原创内容未经受权不得随便应用,转载请分割小编并注明起源。
MySQL 的 commit
命令提交事务时,外部会进行两阶段(Prepare 和 Commit)提交,这篇文章基于 MySQL 8.0.33 对 MySQL 的两阶段提交进行源码剖析,带你理解提交事务过程中都经验了什么。
一、Prepare 阶段
1. Binlog Prepare
获取上一个事务最大的 sequence number
工夫戳。
2. InnoDB Prepare
- 事务状态设置为
prepared
; - 开释 RC 及以下隔离级别的 GAP Lock;
- 将
Undo log
segment 的状态从TRX_UNDO_ACTIVE
批改为TRX_UNDO_PREPARED
; Undo log
写入事务 XID。
二、Commit 阶段
1. Stage 0
保障从实例的 commit order。
2. Flush Stage
-
依据
innodb_flush_log_at_trx_commit
参数进行redo log
的刷盘操作- 获取并清空
BINLOG_FLUSH_STAGE
和COMMIT_ORDER_FLUSH_STAGE
队列 - 存储引擎层将
prepare
状态的redo log
依据innodb_flush_log_at_trx_commit
参数刷盘 - 不再阻塞 slave 的 preserve commit order 的执行
- 获取并清空
- 调用
get_server_sidno()
和Gtid_state::get_automatic_gno()
生成 GTID -
Flush binlog_cache_mngr
- Flush stmt_cache
-
Flush trx_cache
- 生成 last_committed 和 sequence_number
- flush GTID log event
- 将 trx_cache 中的数据 flush 到 binlog cache 中
- 筹备提交事务后的 Binlog pos
- 递增 prepread XID
- 插桩调用 after_flush,将曾经 flush 的 binlog file 和 position 注册到半同步复制插件中
- 如果
sync_binlog!=1
,在 flush stage 更新 Binlog 位点,并播送 update 信号,从库的 Dump 线程能够由此感知 Binlog 的更新
3. Sync Stage
- 依据 sync_binlog 的参数设置进行刷盘前的期待并调用 fsync() 进行刷盘
- 如果 sync_binlog==1,在 sync stage 阶段更新 binog 位点,并播送 update 信号,从库的 Dump 线程能够由此感知 Binlog 的更新
4. Commit Stage
- after_sync hook(半同步复制 after_sync 的钩子)
- 更新全局的 m_max_committed_transaction(用作后续事务的 last_committed),并初始化事务上下文的 sequence number
- Binlog 层提交,什么也不做
-
存储引擎层提交
- 为长久化 GTID 提前调配 update undo segment
- 更新数据字典中被批改表的 update_time 工夫
- 调配 Mini-transaction handle 和 buffer
-
更新 undo 状态
- 对于 insert 状态从 TRX_UNDO_ACTIVE 批改为 TRX_UNDO_TO_FREE,update 批改为 TRX_UNDO_TO_PURGE - 如果事务为 update 还须要将 rollback segments 调配 trx no,并将其增加到 purge 队列中
- 将 update undo log header 增加到 history list 结尾开释一些内存对象
- 在零碎事务表记录 binlog 位点
- 敞开 mvcc read view
- 长久化 GTID
- 开释 insert undo log
- 唤醒后盾线程开始干活,如 master thread、purge thread、page_cleaner
- 更新整组事务的 executed_gtid
- 在存储引擎层提交之后,递加 Prepared 状态下的 XID 计数器
- after_sync hook(半同步复制 after_commit 的钩子)
- 播送
m_stage_cond_binlog
信号变量,唤醒挂起的 follower
ha_commit_trans
函数次要判断是否须要写入 GTID 信息,并开始两阶段提交:
int ha_commit_trans(THD *thd, bool all, bool ignore_global_read_lock) {
/*
Save transaction owned gtid into table before transaction prepare
if binlog is disabled, or binlog is enabled and log_replica_updates
is disabled with slave SQL thread or slave worker thread.
*/
std::tie(error, need_clear_owned_gtid) = commit_owned_gtids(thd, all);
...
// Prepare 阶段
if (!trn_ctx->no_2pc(trx_scope) && (trn_ctx->rw_ha_count(trx_scope) > 1))
error = tc_log->prepare(thd, all);
...
// Commit 阶段
if (error || (error = tc_log->commit(thd, all))) {ha_rollback_trans(thd, all);
error = 1;
goto end;
}
}
Prepare 阶段
两阶段提交的 Prepare 阶段绝对简略,以下是 commit
命令入口及 Prepare 阶段的堆栈和相干作用:
|mysql_execute_command
|--trans_commit
|----ha_commit_trans
|------MYSQL_BIN_LOG::prepare
// 开启 binlog prepare 和 innodb prepare
|--------ha_prepare_low
// Binlog prepare:获取上一个事务最大的 sequence number 工夫戳
|----------binlog_prepare
// innodb prepare
|----------innobase_xa_prepare
|------------trx_prepare_for_mysql
// 1. 调用 trx_prepare_low
// 2. 事务状态设置为 Prepared
// 3. 开释 RC 及以下隔离级别的 GAP Lock
// 4. 刷盘 Redo(已推延到 Commit 阶段的 Flush stage)|--------------trx_prepare
|----------------trx_prepare_low
// 1. 将 undo log segment 的状态从 TRX_UNDO_ACTIVE 批改为 TRX_UNDO_PREPARED
// 2. undo log 写入事务 XID
|------------------trx_undo_set_state_at_prepare
Commit 阶段
Commit 阶段的性能实现次要集中在 MYSQL_BIN_LOG::ordered_commit
函数中。
Flush 阶段
首先看下 Stage 0 和 Stage 1,stage 0 次要是 8.0 新增的一个阶段,次要是针对从库保障 commit order。stage 1 就是大家耳熟能详的 Commit 阶段的三个小阶段其一的 Flush 阶段了:
int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) {
/*
Stage #0: 保障从实例的 SQL 线程依照 Relay log 的事务程序进行提交
*/
if (Commit_order_manager::wait_for_its_turn_before_flush_stage(thd) ||
ending_trans(thd, all) ||
Commit_order_manager::get_rollback_status(thd)) {if (Commit_order_manager::wait(thd)) {return thd->commit_error;}
}
/*
Stage #1: flushing transactions to binary log
While flushing, we allow new threads to enter and will process
them in due time. Once the queue was empty, we cannot reap
anything more since it is possible that a thread entered and
appointed itself leader for the flush phase.
*/
if (change_stage(thd, Commit_stage_manager::BINLOG_FLUSH_STAGE, thd, nullptr,
&LOCK_log)) {DBUG_PRINT("return", ("Thread ID: %u, commit_error: %d", thd->thread_id(),
thd->commit_error));
return finish_commit(thd);
}
THD *wait_queue = nullptr, *final_queue = nullptr;
mysql_mutex_t *leave_mutex_before_commit_stage = nullptr;
my_off_t flush_end_pos = 0;
bool update_binlog_end_pos_after_sync;
// Flush 阶段次要的解决逻辑
flush_error =
process_flush_stage_queue(&total_bytes, &do_rotate, &wait_queue);
if (flush_error == 0 && total_bytes > 0)
/*
flush binlog cache 到 file cache
*/
flush_error = flush_cache_to_file(&flush_end_pos);
// 前面依据 sync_binlog 参数决定更新 binlog pos 的地位并播送 Binlog 更新信号
update_binlog_end_pos_after_sync = (get_sync_period() == 1);
/*
If the flush finished successfully, we can call the after_flush
hook. Being invoked here, we have the guarantee that the hook is
executed before the before/after_send_hooks on the dump thread
preventing race conditions among these plug-ins.
*/
if (flush_error == 0) {const char *file_name_ptr = log_file_name + dirname_length(log_file_name);
assert(flush_end_pos != 0);
/*
插桩调用 after_flush,将曾经 flush 的 binlog file 和 position 注册到半同步复制插件中,用于后续比照 slave 应答承受到的 binlog position。*/
if (RUN_HOOK(binlog_storage, after_flush,
(thd, file_name_ptr, flush_end_pos))) {LogErr(ERROR_LEVEL, ER_BINLOG_FAILED_TO_RUN_AFTER_FLUSH_HOOK);
flush_error = ER_ERROR_ON_WRITE;
}
// 如果 sync_binlog!=1,在 flush stage 更新 binlog 位点并播送 update 信号,从库的 Dump 线程能够由此感知 Binlog 的更新
if (!update_binlog_end_pos_after_sync) update_binlog_end_pos();}
Flush stage 的次要解决逻辑集中在 process_flush_stage_queue
:
int MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var,
bool *rotate_var,
THD **out_queue_var) {
int no_flushes = 0;
my_off_t total_bytes = 0;
mysql_mutex_assert_owner(&LOCK_log);
// 依据 innodb_flush_log_at_trx_commit 参数进行 redo log 的刷盘操作
THD *first_seen = fetch_and_process_flush_stage_queue();
// 调用 get_server_sidno() 和 Gtid_state::get_automatic_gno 生成 GTID
assign_automatic_gtids_to_flush_group(first_seen);
/* Flush thread caches to binary log. */
for (THD *head = first_seen; head; head = head->next_to_commit) {Thd_backup_and_restore switch_thd(current_thd, head);
/*
flush binlog_cache_mngr 的 stmt_cache 和 trx_cache。flush trx_cache:- 生成 last_committed 和 sequence_number
- flush GTID log event
- 将 trx_cache 中的数据 flush 到 binlog cache 中
- 筹备提交事务后的 Binlog pos
- 递增 prepread XID
*/
std::pair<int, my_off_t> result = flush_thread_caches(head);
total_bytes += result.second;
if (flush_error == 1) flush_error = result.first;
#ifndef NDEBUG
no_flushes++;
#endif
}
*out_queue_var = first_seen;
*total_bytes_var = total_bytes;
if (total_bytes > 0 &&
(m_binlog_file->get_real_file_size() >= (my_off_t)max_size ||
DBUG_EVALUATE_IF("simulate_max_binlog_size", true, false)))
*rotate_var = true;
#ifndef NDEBUG
DBUG_PRINT("info", ("no_flushes:= %d", no_flushes));
no_flushes = 0;
#endif
return flush_error;
}
redo log 刷盘的堆栈如下:
// 获取并清空 BINLOG_FLUSH_STAGE 和 COMMIT_ORDER_FLUSH_STAGE 队列,flush 事务到磁盘;不再阻塞 slave 的 preserve commit order 的执行
|fetch_and_process_flush_stage_queue
// 存储引擎层将 prepare 状态的 redo log 依据 innodb_flush_log_at_trx_commit 参数刷盘
|--ha_flush_logs
|----innobase_flush_logs
|------log_buffer_flush_to_disk
SYNC 阶段
Sync 阶段的代码如下:
/*
Stage #2: Syncing binary log file to disk
*/
if (change_stage(thd, Commit_stage_manager::SYNC_STAGE, wait_queue, &LOCK_log,
&LOCK_sync)) {DBUG_PRINT("return", ("Thread ID: %u, commit_error: %d", thd->thread_id(),
thd->commit_error));
return finish_commit(thd);
}
/*
- sync_counter:commit group 的数量
- get_sync_period():获取 sync_binlog 参数的值
- 如果 sync stage 队列中的 commit group 大于等于 sync_binlog 的值,以后 leader 就调用 fsync()进行刷盘操作(sync_binlog_file(false)),在 sync 之前可能会进行期待,期待更多的 commit group 入队,期待的工夫为 binlog_group_commit_sync_no_delay_count 或 binlog_group_commit_sync_delay,默认都为 0。- 如果 sync stage 队列中的 commit group 小于 sync_binlog 的值,以后 leader 不会调用 fsync()进行刷盘也不会期待
- 如果 sync_binlog 为 0,每个 commit group 都会触发期待动作,然而不会 sync
- 如果 sync_binlog 为 1,每个 commit group 都会触发期待动作,且会 sync
*/
if (!flush_error && (sync_counter + 1 >= get_sync_period()))
Commit_stage_manager::get_instance().wait_count_or_timeout(
opt_binlog_group_commit_sync_no_delay_count,
opt_binlog_group_commit_sync_delay, Commit_stage_manager::SYNC_STAGE);
final_queue = Commit_stage_manager::get_instance().fetch_queue_acquire_lock(Commit_stage_manager::SYNC_STAGE);
if (flush_error == 0 && total_bytes > 0) {DEBUG_SYNC(thd, "before_sync_binlog_file");
std::pair<bool, bool> result = sync_binlog_file(false);
sync_error = result.first;
}
/*
如果 sync_binlog==1, 在 sync stage 阶段更新 binog 位点,并播送 update 信号,从库的 Dump 线程能够由此感知 Binlog 的更新(位点在 flush stage 中的 process_flush_stage_queue()
|--flush_thread_caches()
|-----set_trans_pos()函数中设置)*/
if (update_binlog_end_pos_after_sync && flush_error == 0 && sync_error == 0) {
THD *tmp_thd = final_queue;
const char *binlog_file = nullptr;
my_off_t pos = 0;
while (tmp_thd != nullptr) {if (tmp_thd->commit_error == THD::CE_NONE) {tmp_thd->get_trans_fixed_pos(&binlog_file, &pos);
}
tmp_thd = tmp_thd->next_to_commit;
}
if (binlog_file != nullptr && pos > 0) {update_binlog_end_pos(binlog_file, pos);
}
}
DEBUG_SYNC(thd, "bgc_after_sync_stage_before_commit_stage");
leave_mutex_before_commit_stage = &LOCK_sync;
COMMIT 阶段
Commit 阶段的代码如下:
/*
Stage #3: Commit all transactions in order.
*/
commit_stage:
/* binlog_order_commits:是否进行 order commit,即放弃 redo 和 binlog 的提交程序统一 */
if ((opt_binlog_order_commits || Clone_handler::need_commit_order()) &&
(sync_error == 0 || binlog_error_action != ABORT_SERVER)) {
if (change_stage(thd, Commit_stage_manager::COMMIT_STAGE, final_queue,
leave_mutex_before_commit_stage, &LOCK_commit)) {DBUG_PRINT("return", ("Thread ID: %u, commit_error: %d", thd->thread_id(),
thd->commit_error));
return finish_commit(thd);
}
THD *commit_queue =
Commit_stage_manager::get_instance().fetch_queue_acquire_lock(Commit_stage_manager::COMMIT_STAGE);
DBUG_EXECUTE_IF("semi_sync_3-way_deadlock",
DEBUG_SYNC(thd, "before_process_commit_stage_queue"););
if (flush_error == 0 && sync_error == 0)
/* after_sync hook */
sync_error = call_after_sync_hook(commit_queue);
/*
Commit 阶段的次要解决逻辑
*/
process_commit_stage_queue(thd, commit_queue);
/**
* After commit stage
*/
if (change_stage(thd, Commit_stage_manager::AFTER_COMMIT_STAGE,
commit_queue, &LOCK_commit, &LOCK_after_commit)) {DBUG_PRINT("return", ("Thread ID: %u, commit_error: %d", thd->thread_id(),
thd->commit_error));
return finish_commit(thd);
}
THD *after_commit_queue =
Commit_stage_manager::get_instance().fetch_queue_acquire_lock(Commit_stage_manager::AFTER_COMMIT_STAGE);
/* after_commit hook */
process_after_commit_stage_queue(thd, after_commit_queue);
final_queue = after_commit_queue;
mysql_mutex_unlock(&LOCK_after_commit);
} else {if (leave_mutex_before_commit_stage)
mysql_mutex_unlock(leave_mutex_before_commit_stage);
if (flush_error == 0 && sync_error == 0)
sync_error = call_after_sync_hook(final_queue);
}
/* 播送 m_stage_cond_binlog 信号变量,唤醒挂起的 follower */
Commit_stage_manager::get_instance().signal_done(final_queue);
DBUG_EXECUTE_IF("block_leader_after_delete", {const char action[] = "now SIGNAL leader_proceed";
assert(!debug_sync_set_action(thd, STRING_WITH_LEN(action)));
};);
/*
Finish the commit before executing a rotate, or run the risk of a
deadlock. We don't need the return value here since it is in
thd->commit_error, which is returned below.
*/
(void)finish_commit(thd);
DEBUG_SYNC(thd, "bgc_after_commit_stage_before_rotation");
return thd->commit_error == THD::CE_COMMIT_ERROR;
}
Commit 阶段的次要解决逻辑集中在 process_commit_stage_queue
函数中:
void MYSQL_BIN_LOG::process_commit_stage_queue(THD *thd, THD *first) {mysql_mutex_assert_owner(&LOCK_commit);
#ifndef NDEBUG
thd->get_transaction()->m_flags.ready_preempt =
true; // formality by the leader
#endif
for (THD *head = first; head; head = head->next_to_commit) {
DBUG_PRINT("debug", ("Thread ID: %u, commit_error: %d, commit_pending: %s",
head->thread_id(), head->commit_error,
YESNO(head->tx_commit_pending)));
DBUG_EXECUTE_IF(
"block_leader_after_delete",
if (thd != head) {DBUG_SET("+d,after_delete_wait"); };);
/*
If flushing failed, set commit_error for the session, skip the
transaction and proceed with the next transaction instead. This
will mark all threads as failed, since the flush failed.
If flush succeeded, attach to the session and commit it in the
engines.
*/
#ifndef NDEBUG
Commit_stage_manager::get_instance().clear_preempt_status(head);
#endif
/*
更新全局的 m_max_committed_transaction(用作后续事务的 last_committed),并初始本事务上下文的 sequence number
*/
if (head->get_transaction()->sequence_number != SEQ_UNINIT) {mysql_mutex_lock(&LOCK_replica_trans_dep_tracker);
m_dependency_tracker.update_max_committed(head);
mysql_mutex_unlock(&LOCK_replica_trans_dep_tracker);
}
/*
Flush/Sync error should be ignored and continue
to commit phase. And thd->commit_error cannot be
COMMIT_ERROR at this moment.
*/
assert(head->commit_error != THD::CE_COMMIT_ERROR);
Thd_backup_and_restore switch_thd(thd, head);
bool all = head->get_transaction()->m_flags.real_commit;
assert(!head->get_transaction()->m_flags.commit_low ||
head->get_transaction()->m_flags.ready_preempt);<br> // Binlog Commit、Innodb Commit
::finish_transaction_in_engines(head, all, false);
DBUG_PRINT("debug", ("commit_error: %d, commit_pending: %s",
head->commit_error, YESNO(head->tx_commit_pending)));
}
/*
锁定 sidno,更新整组事务 的 executed_gtid
- 如果没开启 binlog,@@GLOBAL.GTID_PURGED 的值是从 executed_gtid 获取的,此时 @@GLOBAL.GTID_PURGED 的值和 @@GLOBAL.GTID_EXECUTED 永远是统一的,就不须要在记录 lost_gtids
- 如果开启了 binlog,然而未开启 log_replica_updates,slave 的 SQL 线程或 slave worker 线程
将本身的 GTID 更新到 executed_gtids、lost_gtids
*/
gtid_state->update_commit_group(first);
for (THD *head = first; head; head = head->next_to_commit) {Thd_backup_and_restore switch_thd(thd, head);
auto all = head->get_transaction()->m_flags.real_commit;
// 只针对内部 XA 事务,在存储引擎层将事务标记为 Prepared
trx_coordinator::set_prepared_in_tc_in_engines(head, all);
/*
在存储引擎层提交之后,递加 Prepared 状态下的 XID 计数器
*/
if (head->get_transaction()->m_flags.xid_written) dec_prep_xids(head);
}
}
其中::finish_transaction_in_engines 函数是次要的存储引擎层提交逻辑,相干堆栈如下:
|::finish_transaction_in_engines
|--trx_coordinator::commit_in_engines
|----ha_commit_low
// Binlog 层提交什么也不做(空函数)|------binlog_commit
// 存储引擎层提交
|------innobase_commit
|--------innobase_commit_low
|----------trx_commit_for_mysql
// 为长久化 GTID 提前调配 update undo segment
|------------trx_undo_gtid_add_update_undo
// 更新数据字典中被批改表的 update_time 工夫
|------------trx_update_mod_tables_timestamp
// 调配 Mini-transaction handle 和 buffer
|------------trx_commit
// 提交 mini-transaction
|--------------trx_commit_low
|----------------trx_write_serialisation_history
// 更新 undo 状态:// 对于 insert 状态从 TRX_UNDO_ACTIVE 批改为 TRX_UNDO_TO_FREE
// update 批改为 TRX_UNDO_TO_PURGE
// 如果事务为 update 还须要将 rollback segments 调配 trx no,并将其增加到 purge 队列中
|------------------trx_undo_set_state_at_finish
// 将 update undo log header 增加到 history list 结尾开释一些内存对象;
|------------------trx_undo_update_cleanup
// 在零碎事务表记录 binlog 位点
|------------------trx_sys_update_mysql_binlog_offset
|----------------trx_commit_in_memory
//- 敞开 mvcc read view
//- 长久化 GTID
//- 开释 insert undo log
//- 唤醒后盾线程开始干活,如:master thread、purge thread、page_cleaner
阶段转换
阶段转换的逻辑次要是由 change_stage
中的 enroll_for
函数实现:
- 进入队列的第一个线程会作为整组事务的 leader
- 后续进入队列的线程会作为整组事务的 follower
- follower 线程挂起期待 m_stage_cond_binlog 信号变量唤醒
- leader 负责提交整组事务,提交实现后,发送 m_stage_cond_binlog 信号变量唤醒挂起的 follower
- 队列转化的次要逻辑是线程先入下个阶段的队列,而后再开释上一个阶段的 mutex,而后再获取下一个阶段的 mutex
- Flush Stage 不会获取 mutex
- Sync Stage 须要获取 LOCK_sync
- Commit Stage 须要获取 LOCK_commit mutex
- After Commit Stage 须要获取 LOCK_after_commit mutex
bool Commit_stage_manager::enroll_for(StageID stage, THD *thd,
mysql_mutex_t *stage_mutex,
mysql_mutex_t *enter_mutex) {
// 如果队列为空,线程就是 leader
thd->rpl_thd_ctx.binlog_group_commit_ctx().assign_ticket();
bool leader = this->append_to(stage, thd);
/*
如果 FLUSH stage 队列((BINLOG_FLUSH_STAGE 或 COMMIT_ORDER_FLUSH_STAGE)不为空,此线程就不能成为 leader。leader
须要获取 enter_mutex
*/
if (leader) {if (stage == COMMIT_ORDER_FLUSH_STAGE) {leader = m_queue[BINLOG_FLUSH_STAGE].is_empty();
/*
leader 转换的逻辑。session 的队列有 5 种:- Binlog flush queue: flush redo 并写 Binlog File
- Commit order flush queue: 针对 commit order 的事务,然而会参加 group commit 的结尾局部,直到引擎层的 flush。- Sync queue: sync transaction
- Commit queue: 提交事务
- After commit queue: 调用事务的 after_commit hook
*/
} else if (stage == BINLOG_FLUSH_STAGE && // 以后线程是 BINLOG_FLUSH_STAGE 中的第一个线程;然而 COMMIT_ORDER_FLUSH_STAGE
// 曾经有了 leader,此时以后线程会挂起,期待 COMMIT_ORDER_FLUSH_STAGE 的 leader 的信号唤醒
!m_queue[COMMIT_ORDER_FLUSH_STAGE].is_empty()) {
/*
以后事务是 binlog queue 中的第一个线程,然而在 commit order queue 中曾经有了一个 leader。此时以后线程会作为 leader,而 commit order leader 会转变为 follower。扭转 leader 的起因是 commit order leader 不能作为 binlog 线程的 leader,因为 commit order threads
必须在 binlog threads 操作完之前来到 commit group。转变 leader 为 followers 的步骤如下:1. commit order thread 首先进入 flush stage,并成为 commit order leader。2. commit order leader 尝试获取 stage mutex,这可能会须要一些工夫,比方 mutex 曾经被上一个
commit group 的 leader 获取。3. 在此期间,一个 binlog 线程进入了 flush stage。它须要期待来自 commit order leader 的信号。4. commit order leader 获取了 stage mutex,而后它会查看是否有 binlog thread 进入了 flush stage,如果发现了就转变 leader。5. commit order leader 给 binlog leader 发送一个信号,并成为 follower,期待 commit 的实现(和其余 follower 的行为统一)。6. binlog leader 被 commit order leader 的信号唤醒并执行 group commit。*/
CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("before_binlog_leader_wait");
while (thd->tx_commit_pending)
mysql_cond_wait(&m_stage_cond_leader,
&m_queue_lock[BINLOG_FLUSH_STAGE]);
}
}
unlock_queue(stage);
/*
告诉下一个组提交事务进入队列
*/
if (stage == BINLOG_FLUSH_STAGE) {Commit_order_manager::finish_one(thd);
CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("after_binlog_leader_wait");
} else if (stage == COMMIT_ORDER_FLUSH_STAGE) {Commit_order_manager::finish_one(thd);
}
/*
当进入第一个 stage 时,能够不必获取 stage mutex
*/
if (stage_mutex && need_unlock_stage_mutex) mysql_mutex_unlock(stage_mutex);
/*
如果队列非空,以后线程作为 follower 期待 leader 解决队列
*/
if (!leader) {CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("before_follower_wait");
mysql_mutex_lock(&m_lock_done);
#ifndef NDEBUG
thd->get_transaction()->m_flags.ready_preempt = true;
if (leader_await_preempt_status) mysql_cond_signal(&m_cond_preempt);
#endif
// tx_commit_pending:还有事务 commit 操作未实现
while (thd->tx_commit_pending) {if (stage == COMMIT_ORDER_FLUSH_STAGE) {mysql_cond_wait(&m_stage_cond_commit_order, &m_lock_done);
} else {
// follower 线程在此处挂起,期待 leader 提交事务实现后被唤醒
mysql_cond_wait(&m_stage_cond_binlog, &m_lock_done);
}
}
mysql_mutex_unlock(&m_lock_done);
return false;
}
#ifndef NDEBUG
if (stage == Commit_stage_manager::SYNC_STAGE)
DEBUG_SYNC(thd, "bgc_between_flush_and_sync");
#endif
bool need_lock_enter_mutex = false;
if (leader && enter_mutex != nullptr) {
/*
如果因为在轮替 Binlog 时曾经获取了 LOCK_log,就不在须要获取 enter_mutex。*/
need_lock_enter_mutex = !(mysql_bin_log.is_rotating_caused_by_incident &&
enter_mutex == mysql_bin_log.get_log_lock());
if (need_lock_enter_mutex)
mysql_mutex_lock(enter_mutex);
else
mysql_mutex_assert_owner(enter_mutex);
}
// leader 转换的逻辑
if (stage == COMMIT_ORDER_FLUSH_STAGE) {
CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("after_commit_order_thread_becomes_leader");
lock_queue(stage);
if (!m_queue[BINLOG_FLUSH_STAGE].is_empty()) {if (need_lock_enter_mutex) mysql_mutex_unlock(enter_mutex);
THD *binlog_leader = m_queue[BINLOG_FLUSH_STAGE].get_leader();
binlog_leader->tx_commit_pending = false;
mysql_cond_signal(&m_stage_cond_leader);
unlock_queue(stage);
mysql_mutex_lock(&m_lock_done);
/* wait for signal from binlog leader */
CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("before_commit_order_leader_waits_for_binlog_leader");
while (thd->tx_commit_pending)
mysql_cond_wait(&m_stage_cond_commit_order, &m_lock_done);
mysql_mutex_unlock(&m_lock_done);
leader = false;
return leader;
}
}
return leader;
}