MySQL事務的提交采用兩階段提交協議, 前些日子和同事聊的時候發現對提交的細節還是有些模糊,這里對照MySQL原始碼詳細記錄一下,版本是MySQL5.7.36,
一. 事務的提交流程,
1. 獲取 MDL_key::COMMIT 鎖: FTWRL會阻塞 commit 操作,
-------------------------------- 接下來進入 prepare 階段:
2. binlog prepare: 將上一次 commit 佇列中的最大的 seq_no 寫入本次事務的 last_commit 中,【用戶發起的顯式提交(顯式commit), 或者DDL發起的隱式提交沒有這一步】
3. innodb prepare:
3.1 獲取執行緒的 xid, 設定為事務的 xid.
3.2 修改Innodb中事務的狀態為 Prepare 狀態
3.3 將 undo 日志端從 active 設定為 prepare 狀態, 并在undo段的第一個undo的last undo log header中寫入xid.
-------------------------------- 接下來進入 commit 階段:
4. XID_Event生成并寫入binlog cache中,會首先將事務中的寫操作生成的event flush到binlog cache中, 再寫入 xid event,這也符合binlog中事務event順序,
5. binlog flush:
5.1. 當前執行緒加入flush佇列, 如果flush佇列是空的, 則當前為leader; 如果為非空, 則為 follower, 非leader執行緒將被阻塞, 直到commit之后被leader執行緒喚醒, 完成提交,
5.2. 獲取Lock log鎖
5.3. 對flush佇列進行fetch, 本次處理的佇列就固定了
5.4. 在innodb存盤引擎中flush redo log, 做 innodb層redo持久化
5.5. 為flush佇列中每個事務生成gtid
5.6. 將flush佇列中每個執行緒的binlog cache flush到 binlog檔案中, 這里包含三步:
a. 直接將 GTID event 寫入 binlog磁盤檔案中
b. 將事務生成的別的 event 寫入 binlog file cache中
c. 將 binlog cache[IO cache] flush到檔案
5.7. 判斷 binlog 是否需要切換, 設定切換標記,注意:這里是將事務的event寫入binlog file cache后再判斷, 因此一個事務的binlog都位于同一個binlog檔案中
5.8. after_flush hook:如果sync_binlog != 1, 那么在這里更新 binlog 點位, 通知dump執行緒向從節點發送 event
6. binlog sync:
6.1. 形成 sync 佇列, 第一個進入 sync 佇列的leader為本階段的laeder, 其他flush佇列加入sync佇列的leader執行緒將被阻塞, 直到commit階段被leader執行緒喚醒, finish_commit
6.2. 釋放 Lock log mutex, 獲取 Lock sync mutex
6.3. 根據 delay 的設定來決定是否延遲一段時間, 使得sync佇列變大, last commit是在binlog prepare時生成, 這時last commit尚未修改, 因此加入sync佇列的事務是同一組事務, 可以提高從庫 mts 效率,
6.4. fetch sync佇列, 對 sync 佇列進行固化
6.5. sync binlog file到磁盤中, 需要根據sync_binlog的設定來決定是否刷盤
6.6. 如果 sync_binlog = 1, 那么更新 binlog end pos, 通知 dump執行緒發送 event
7. commit階段:
如果需要按順序提交: order_commits:
7.1. sync佇列加入commit佇列, 第一個進入的 sync 佇列的leader為本階段的leader, 其他sync佇列加入commit佇列的leader會被阻塞, 直到 commit階段后被leader執行緒喚醒, 進入 finish commit
7.2. 釋放 Lock sync mutex,獲取 lock commit mutex
7.3. fetch commit佇列, 對 commit 佇列進行固化
7.4. 呼叫 after_sync hook: 這里, 如果半同步復制為 after_sync, 則需要等待dump執行緒收到從節點對于commit佇列中最大的binlog filename和 pos的ack,
7.5. 在 Innodb 層提交之前變更 last_commit, 將其變更為 commit 佇列中最大的 seqno
7.6. COMMIT佇列中每個事務按照順序進行存盤引擎層提交
7.7. 變更 gtid_executed
7.8. 釋放 lock commit mutex
如果不需要按順序提交:
7.1. 呼叫 after_sync hook: 這里, 如果半同步復制為 after_sync, 則需要等待dump執行緒收到從節點對于commit佇列中最大的binlog filename和 pos的ack,
8. 收尾:
8.1. leader 執行緒喚醒組內成員, 進行各自操作
8.2. commit 佇列中事務清空 binlog cache 臨時檔案和記憶體
8.3. 如果不需要按順序提交:則commit佇列中執行緒各自進行存盤引擎層的提交, 提交完成之后更新 gtid_executed
8.4. 決定是否進行 binlog 的 rotate
8.5. 如果 rotate 了 binlog, 則根據 expire_log_days 判斷是否需要清理 binlog
二. 流程代碼,
主代碼如下,洗掉了部分輔助代碼,從 trans_commit(THD *thd) 函式開始,
bool trans_commit(THD *thd)
{ // 提交事務, res = ha_commit_trans(thd, TRUE); if (res == FALSE) if (thd->rpl_thd_ctx.session_gtids_ctx().notify_after_transaction_commit(thd)) sql_print_warning("Failed to collect GTID to send in the response packet!"); thd->server_status &= ~SERVER_STATUS_IN_TRANS; thd->variables.option_bits &= ~OPTION_BEGIN; thd->get_transaction()->reset_unsafe_rollback_flags(Transaction_ctx::SESSION); thd->lex->start_transaction_opt = 0; /* The transaction should be marked as complete in P_S. */ assert(thd->m_transaction_psi == NULL); thd->tx_priority = 0; trans_track_end_trx(thd); DBUG_RETURN(MY_TEST(res)); }
/*
提交事務,
server層最后呼叫函式 ha_commit_trans(), 該函式負責處理 binlog 層和存盤引擎層的提交,
*/
int ha_commit_trans(THD *thd, bool all, bool ignore_global_read_lock)
{
// 讀寫事務 && 不能忽略全域讀鎖
if (rw_trans && !ignore_global_read_lock)
{
/*
獲取一個 MDL_KEY::COMMIT 元資料鎖, 該元資料鎖將確保 commit 操作會被活躍的 FTWRL 鎖阻止,
FTWRL鎖會阻塞 COMMIT 操作,
*/
MDL_REQUEST_INIT(&mdl_request,
MDL_key::COMMIT, "", "", MDL_INTENTION_EXCLUSIVE,
MDL_EXPLICIT);
DBUG_PRINT("debug", ("Acquire MDL commit lock"));
// 申請 MDL_key::COMMIT 鎖, 申請失敗
if (thd->mdl_context.acquire_lock(&mdl_request,
thd->variables.lock_wait_timeout))
{
ha_rollback_trans(thd, all);
DBUG_RETURN(1);
}
release_mdl = true;
}
// 判斷是否開啟 xa 事務;
// 所有的 entries 都支持 2pc && 在事務 scope 中設定做讀寫更改的引擎數量 > 1
if (!trn_ctx->no_2pc(trx_scope) && (trn_ctx->rw_ha_count(trx_scope) > 1))
// prepare; 在事務協調器中 prepare commit tx, 在引擎層生成一個 XA 事務,
// tc_log: mysqld啟動時生成的 MySQL_BIN_LOG 物件[XA控制物件],
error = tc_log->prepare(thd, all);
}
/*
XA 事務的狀態變更為 prepared, 中間態,最侄訓變成常規的 NOTR 狀態,
*/
if (!error && all && xid_state->has_state(XID_STATE::XA_IDLE))
{
assert(thd->lex->sql_command == SQLCOM_XA_COMMIT &&
static_cast<Sql_cmd_xa_commit *>(thd->lex->m_sql_cmd)->get_xa_opt() == XA_ONE_PHASE);
// 設定 XA 事務狀態為 XA_PREPARED 狀態,
xid_state->set_state(XID_STATE::XA_PREPARED);
}
/**
* XA 事務提交
*/
if (error || (error = tc_log->commit(thd, all)))
{
ha_rollback_trans(thd, all);
error = 1;
goto end;
}
end:
// 釋放 mdl 鎖,
if (release_mdl && mdl_request.ticket)
{
thd->mdl_context.release_lock(mdl_request.ticket);
}
/*
* 釋放資源并執行其他清理,空事務也需要,
*/
if (is_real_trans)
{
trn_ctx->cleanup();
thd->tx_priority = 0;
}
}
int MYSQL_BIN_LOG::prepare(THD *thd, bool all)
{
/*
設定 HA_IGNORE_DURABILITY 在 prepare 階段不將事務的 prepare record 刷到 innodb redo log,
這樣在 binlog 組提交的 flush 階段 flushing binlog 之前 flush prepare record 到 innodb redo log,
在 innodb prepare 時, 不刷 redo log.
*/
thd->durability_property = HA_IGNORE_DURABILITY;
// 在引擎中 prepare commit trx
int error = ha_prepare_low(thd, all);
DBUG_RETURN(error);
}
/**
* prepare commit trx
* 在引擎層 prepare commit trx
* 包括 binlog引擎 和 innodb引擎
*/
int ha_prepare_low(THD *thd, bool all)
{
// 遍歷引擎
if (ha_info)
{
for (; ha_info && !error; ha_info = ha_info->next())
{
int err = 0;
// 引擎
handlerton *ht = ha_info->ht();
/*
如果這個特定事務是只讀的, 不要呼叫兩階段提交,
*/
if (!ha_info->is_trx_read_write())
continue;
/**
* 呼叫引擎的 prepare 在存盤層生成 XA 事務,
* 先 binlog prepare, 再 innodb prepare;
* binlog prepare: 將上一次 commit 佇列中最大的 seq num 寫入本次事務的 last_commit 中
* innodb prepare: 在 innodb 中更改 undo 日志段的狀態為 trx_undo_prepared, 并將 xid 寫入 undo log header,
* */
if ((err = ht->prepare(ht, thd, all)))
{
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
error = 1;
}
// ha_prepare_count++
thd->status_var.ha_prepare_count++;
}
}
}
/*
* binlog prepare;
*/
static int binlog_prepare(handlerton *hton, THD *thd, bool all)
{
if (!all)
{
// 將上一次 commit 佇列中最大的 seq number 寫入本次事務的 last_commit 中,
thd->get_transaction()->store_commit_parent(mysql_bin_log.m_dependency_tracker.get_max_committed_timestamp());
}
DBUG_RETURN(all && is_loggable_xa_prepare(thd) ? mysql_bin_log.commit(thd, true) : 0);
}
/*******************************************************************/ /**
Innodb prepare 一個 X/Open XA 分布式事務,
static int
innobase_xa_prepare(
/*================*/
handlerton *hton, /*!< in: InnoDB handlerton ; innodb引擎 */
THD *thd, /*!< in: handle to the MySQL thread of
the user whose XA transaction should
be prepared ; mysql執行緒 */
bool prepare_trx) /*!< in: true - prepare transaction
false - the current SQL statement
ended ; true: prepare 事務
false: 當前 SQL 陳述句結束, 陳述句級別的提交 */
{
// trx
trx_t *trx = check_trx_exists(thd);
// 獲取thd的 xid, 同時設定到 trx -> xid 中
thd_get_xid(thd, (MYSQL_XID *)trx->xid);
/* 釋放可能的 FIFO ticket 和 search latch,
因為我們要保留 trx_sys -> mutex, 我們必須首先釋放 search system latch 來遵守鎖存順序,
*/
trx_search_latch_release_if_reserved(trx);
// prepare trx
if (prepare_trx || (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
{
/* preapre 整個事務, 或者這是一個SQL陳述句結束, autocommit 是打開狀態 */
// 事務已經在 mysql 2pc 協調器中注冊,
ut_ad(trx_is_registered_for_2pc(trx));
// trx prepare
dberr_t err = trx_prepare_for_mysql(trx);
}
else
{
/* 陳述句的提交動作, 而非真正的事務提交, */
// 需要釋放陳述句 hold 的 auto_increment 鎖
lock_unlock_table_autoinc(trx);
// 記錄本陳述句的 undo 資訊, 以便陳述句級的回滾
// 標記最新SQL陳述句結束,
trx_mark_sql_stat_end(trx);
}
return (0);
}
/**
* trx prepare
*/
dberr_t
trx_prepare_for_mysql(trx_t *trx)
{
trx->op_info = "preparing";
// prepare trx.
trx_prepare(trx);
}
/****************************************************************/ /**
prepare trx.*/
static void
trx_prepare(
/*========*/
trx_t *trx) /*!< in/out: transaction */
{
// 回滾段 != NULL && redo 段被修改
if (trx->rsegs.m_redo.rseg != NULL && trx_is_redo_rseg_updated(trx))
{
// 為指定的回滾段 preapre 一個事務,lsn 為當前已 commit 的 lsn
lsn = trx_prepare_low(trx, &trx->rsegs.m_redo, false);
}
if (trx->rsegs.m_noredo.rseg != NULL && trx_is_noredo_rseg_updated(trx))
{
// 為指定的回滾段 preapre 一個事務,
trx_prepare_low(trx, &trx->rsegs.m_noredo, true);
}
/*--------------------------------------*/
// 事務狀態為 TRX_STATE_ACTIVE 狀態, 修改事務狀態
trx->state = TRX_STATE_PREPARED;
// 事務系統中處于 xa prepared 狀態的事務的數量
trx_sys->n_prepared_trx++;
/*--------------------------------------*/
/* Release read locks after PREPARE for READ COMMITTED
and lower isolation.
對 rc 隔離級別, 在 prepare 之后釋放 read locks, 降低隔離度
*/
if (trx->isolation_level <= TRX_ISO_READ_COMMITTED)
{
/* Stop inheriting GAP locks.
停止繼承 GAP lock,
*/
trx->skip_lock_inheritance = true;
/* Release only GAP locks for now.
釋放 GAP lock,
*/
lock_trx_release_read_locks(trx, true);
}
switch (thd_requested_durability(trx->mysql_thd))
{
case HA_IGNORE_DURABILITY:
/*
在 binlog group commit 的 prepare 階段, 我們設定 HA_IGNORE_DURABILITY , 這樣在這個階段不會 flush redo log,
這樣我們就可以在 binlog group commit 的 flush 階段在將 binary log寫入二進制日志之前, 在一個組中 flush redo log,
*/
break;
case ..
}
}
/****************************************************************/ /**
為指定的回滾段 preapre 一個事務, */
static lsn_t
trx_prepare_low(
/*============*/
trx_t *trx, /*!< in/out: transaction */
trx_undo_ptr_t *undo_ptr, /*!< in/out: pointer to rollback
segment scheduled for prepare. 指向回滾段的指標 */
bool noredo_logging) /*!< in: turn-off redo logging. 不需要redo log */
{
lsn_t lsn;
// insert 或者 undo 回滾段不為 NULL
if (undo_ptr->insert_undo != NULL || undo_ptr->update_undo != NULL)
{
// start a sync mtr
mtr_start_sync(&mtr);
// 設定 mtr mode
if (noredo_logging)
{
mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
}
/*
將 undo 日志段狀態從 trx_undo_active 修改為 trx_undo_prepared:
更改 undo 回滾段將其設定為 prepare 狀態,
*/
mutex_enter(&rseg->mutex);
// insert undo log 不為 NULL
if (undo_ptr->insert_undo != NULL)
{
/*
這里不需要獲取 trx->undo_mutex, 因為只允許一個 OS 執行緒為該事務做事務準備,
*/
// 將 undo 日志段狀態從 trx_undo_active 修改為 trx_undo_prepared 狀態
trx_undo_set_state_at_prepare(
trx, undo_ptr->insert_undo, false, &mtr);
}
// 將 undo 日志段狀態從 trx_undo_active 修改為 trx_undo_prepared 狀態
if (undo_ptr->update_undo != NULL)
{
trx_undo_set_state_at_prepare(
trx, undo_ptr->update_undo, false, &mtr);
}
mutex_exit(&rseg->mutex);
lsn = mtr.commit_lsn();
}
else
{
lsn = 0;
}
return (lsn);
}
/* 修改 undo 日志段的狀態*/
page_t *
trx_undo_set_state_at_prepare(
trx_t *trx,
trx_undo_t *undo,
bool rollback,
mtr_t *mtr)
{
// 獲取 undo page 頁, 并在其上加 x-latch
undo_page = trx_undo_page_get(
page_id_t(undo->space, undo->hdr_page_no),
undo->page_size, mtr);
// undo 段 header
seg_hdr = undo_page + TRX_UNDO_SEG_HDR;
// 如果是 XA rollback
if (rollback)
{
ut_ad(undo->state == TRX_UNDO_PREPARED);
// 將 undo 段的狀態從 TRX_UNDO_PREPARED 修改為 TRX_UNDO_ACTIVE 狀態
mlog_write_ulint(seg_hdr + TRX_UNDO_STATE, TRX_UNDO_ACTIVE,
MLOG_2BYTES, mtr);
return (undo_page);
}
/*------------------------------*/
// 是 XA prepare, 則將 undo 段的狀態從 TRX_UNDO_ACTIVE 修改為 TRX_UNDO_PREPARED, 并將 xid 寫入 undo,
ut_ad(undo->state == TRX_UNDO_ACTIVE);
undo->state = TRX_UNDO_PREPARED;
undo->xid = *trx->xid;
/*------------------------------*/
// 在 undo 段中更新當前 undo 段的狀態
mlog_write_ulint(seg_hdr + TRX_UNDO_STATE, undo->state,
MLOG_2BYTES, mtr);
// 在 undo 段 last undo log header 中寫入 xid
offset = mach_read_from_2(seg_hdr + TRX_UNDO_LAST_LOG);
undo_header = undo_page + offset;
mlog_write_ulint(undo_header + TRX_UNDO_XID_EXISTS,
TRUE, MLOG_1BYTE, mtr);
trx_undo_write_xid(undo_header, &undo->xid, mtr);
return (undo_page);
}
/*
在事務協調器中提交事務,
該函式將在二進制日志和存盤引擎中提交事務,
*/
TC_LOG::enum_result MYSQL_BIN_LOG::commit(THD *thd, bool all)
{
........
{
/* The prepare phase of XA transaction two phase logging. */
int err = 0;
bool one_phase = get_xa_opt(thd) == XA_ONE_PHASE;
assert(thd->lex->sql_command != SQLCOM_XA_COMMIT || one_phase);
// xid event 生成并寫入 binlog cache, 在真正的寫操作陳述句生成的event之后
XID_STATE *xs = thd->get_transaction()->xid_state();
XA_prepare_log_event end_evt(thd, xs->get_xid(), one_phase);
err = cache_mngr->trx_cache.finalize(thd, &end_evt, xs);
........
}
trx_stuff_logged = true;
}
........
// 提交,
// ordered_commit: 事務在 binlog 階段提交的核心函式,
if (ordered_commit(thd, all, skip_commit))
DBUG_RETURN(RESULT_INCONSISTENT);
/*
Mark the flag m_is_binlogged to true only after we are done
with checking all the error cases.
檢查完所有錯誤情況后, 將標記 m_is_binlogged 標記為 true.
*/
if (is_loggable_xa_prepare(thd))
thd->get_transaction()->xid_state()->set_binlogged();
}
DBUG_RETURN(RESULT_SUCCESS);
}
/**
Flush and commit the transaction.
This will execute an ordered flush and commit of all outstanding
transactions and is the main function for the binary log group
commit logic. The function performs the ordered commit in two
phases.
The first phase flushes the caches to the binary log and under
LOCK_log and marks all threads that were flushed as not pending.
The second phase executes under LOCK_commit and commits all
transactions in order.
The procedure is:
1. Queue ourselves for flushing.
2. Grab the log lock, which might result is blocking if the mutex is
already held by another thread.
3. If we were not committed while waiting for the lock
1. Fetch the queue
2. For each thread in the queue:
a. Attach to it
b. Flush the caches, saving any error code
3. Flush and sync (depending on the value of sync_binlog).
4. Signal that the binary log was updated
4. Release the log lock
5. Grab the commit lock
1. For each thread in the queue:
a. If there were no error when flushing and the transaction shall be committed:
- Commit the transaction, saving the result of executing the commit.
6. Release the commit lock
7. Call purge, if any of the committed thread requested a purge.
8. Return with the saved error code
*/
int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit)
{
/*
Stage #1: flushing transactions to binary log
階段1: 將事務 flush 到二進制日志
While flushing, we allow new threads to enter and will process
them in due time. Once the queue was empty, we cannot reap
anything more since it is possible that a thread entered and
appointed itself leader for the flush phase.
在 flush 時, 允許新的執行緒進入, 并在適當的時間處理他們,
一旦佇列變空, 我們就不能再識訓任何東西了, 因為可能有一個執行緒進入了佇列并
指定自己為flush階段的 leader,
*/
#ifdef HAVE_REPLICATION
/**
* 先形成 flush 佇列, 非 leader 執行緒將被阻塞, 直到 commit 階段被 leader 執行緒喚醒,
* 然后leader執行緒獲取 Lock log鎖
*/
if (has_commit_order_manager(thd))
{
Slave_worker *worker = dynamic_cast<Slave_worker *>(thd->rli_slave);
Commit_order_manager *mngr = worker->get_commit_order_manager();
if (mngr->wait_for_its_turn(worker, all))
{
thd->commit_error = THD::CE_COMMIT_ERROR;
DBUG_RETURN(thd->commit_error);
}
// 獲取 Lock_log 鎖, 非 leader 執行緒將被阻塞, 直到commit之后被 leader 執行緒喚醒, 非 leader 執行緒這里回傳 true, 執行緒應該等待提交完成,
if (change_stage(thd, Stage_manager::FLUSH_STAGE, thd, NULL, &LOCK_log))
DBUG_RETURN(finish_commit(thd));
}
else
#endif
// 獲取 Lock_log 鎖, 非 leader 執行緒將被阻塞, 直到被 leader 執行緒喚醒, 非 leader 執行緒這里回傳 true, 執行緒應該等待提交完成,
if (change_stage(thd, Stage_manager::FLUSH_STAGE, thd, NULL, &LOCK_log))
{
DBUG_RETURN(finish_commit(thd));
}
THD *wait_queue = NULL, *final_queue = NULL;
mysql_mutex_t *leave_mutex_before_commit_stage = NULL;
my_off_t flush_end_pos = 0;
bool update_binlog_end_pos_after_sync;
DEBUG_SYNC(thd, "waiting_in_the_middle_of_flush_stage");
// 執行 flush 階段操作,
/*
* 1. 對 flush 佇列進行 fetch, 本次處理的flush佇列就固定了
2. 在 innodb 存盤引擎中 flush redo log, 做 innodb 層 redo 持久化,
3. 為 flush 佇列中每個事務生成 gtid,
4. 將 flush佇列中每個執行緒的 binlog cache flush 到 binlog 日志檔案中,這里包含兩步:
1. 將事務的 GTID event直接寫入 binlog 磁盤檔案中
2. 將事務生成的別的 event 寫入 binlog file cache 中
*/
flush_error = process_flush_stage_queue(&total_bytes, &do_rotate,
&wait_queue);
// 將 binary log cache(IO cache) flush到檔案中
if (flush_error == 0 && total_bytes > 0)
flush_error = flush_cache_to_file(&flush_end_pos);
// sync_binlog 是否等于 1
update_binlog_end_pos_after_sync = (get_sync_period() == 1);
/*
如果 flush 操作成功, 則呼叫 after_flush hook,
*/
if (flush_error == 0)
{
const char *file_name_ptr = log_file_name + dirname_length(log_file_name);
assert(flush_end_pos != 0);
if (RUN_HOOK(binlog_storage, after_flush,
(thd, file_name_ptr, flush_end_pos)))
{
sql_print_error("Failed to run 'after_flush' hooks");
flush_error = ER_ERROR_ON_WRITE;
}
// 不等于 1, 通知 dump 執行緒
if (!update_binlog_end_pos_after_sync)
// 更新 binlog end pos, 通知 dump 執行緒向從庫發送 event
update_binlog_end_pos();
DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE(););
}
if (flush_error)
{
/*
Handle flush error (if any) after leader finishes it's flush stage.
如果存在 flush 錯誤, 則處理 flush錯誤
*/
handle_binlog_flush_or_sync_error(thd, false /* need_lock_log */,
(thd->commit_error == THD::CE_FLUSH_GNO_EXHAUSTED_ERROR)
? ER(ER_GNO_EXHAUSTED)
: NULL);
}
/*
Stage #2: Syncing binary log file to disk
sync binary log file to disk.
*/
/** 釋放 Lock_log mutex, 獲取 Lock_sync mutex
* 第一個進入的 flush 佇列的 leader 為本階段的 leader, 其他 flush 佇列加入 sync 佇列, 其他 flush 佇列的
* leader會被阻塞, 直到 commit 階段被 leader 執行緒喚醒,
* */
if (change_stage(thd, Stage_manager::SYNC_STAGE, wait_queue, &LOCK_log, &LOCK_sync))
{
DBUG_RETURN(finish_commit(thd));
}
/*
根據 delay 的設定來決定是否延遲一段時間, 如果 delay 的時間越久, 那么加入 sync 佇列的
事務就越多【last commit 是在 binlog prepare 時生成的, 尚未更改, 因此加入 sync 佇列的
事務是同一組事務】, 提高了從庫 mts 的效率,
*/
if (!flush_error && (sync_counter + 1 >= get_sync_period()))
stage_manager.wait_count_or_timeout(opt_binlog_group_commit_sync_no_delay_count,
opt_binlog_group_commit_sync_delay,
Stage_manager::SYNC_STAGE);
// fetch sync 佇列, 對 sync 佇列進行固化,
final_queue = stage_manager.fetch_queue_for(Stage_manager::SYNC_STAGE);
// 這里 sync_binlog file到磁盤中
if (flush_error == 0 && total_bytes > 0)
{
// 根據 sync_binlog 的設定決定是否刷盤
std::pair<bool, bool> result = sync_binlog_file(false);
sync_error = result.first;
}
// 在這里 sync_binlog = 1, 更新 binlog end_pos, 通知 dump 執行緒發送 event
if (update_binlog_end_pos_after_sync)
{
THD *tmp_thd = final_queue;
const char *binlog_file = NULL;
my_off_t pos = 0;
while (tmp_thd->next_to_commit != NULL)
tmp_thd = tmp_thd->next_to_commit;
if (flush_error == 0 && sync_error == 0)
{
tmp_thd->get_trans_fixed_pos(&binlog_file, &pos);
// 更新 binlog end pos, 通知 dump 執行緒
update_binlog_end_pos(binlog_file, pos);
}
}
leave_mutex_before_commit_stage = &LOCK_sync;
/*
Stage #3: Commit all transactions in order.
按順序在 Innodb 層提交所有事務,
如果我們不需要對提交順序進行排序, 并且每個執行緒必須執行 handlerton 提交, 那么這個階段可以跳過,
然而, 由于我們保留了前一階段的鎖, 如果我們跳過這個階段, 則必須進行解鎖,
*/
commit_stage:
// 如果需要順序提交
if (opt_binlog_order_commits &&
(sync_error == 0 || binlog_error_action != ABORT_SERVER))
{
// SYNC佇列加入 COMMIT 佇列, 第一個進入的 SYNC 佇列的 leader 為本階段的 leader,其他 sync 佇列
// 加入 commit 佇列的 leade 會被阻塞, 直到 COMMIT 階段后被 leader 執行緒喚醒,
// 釋放 lock_sync mutex, 持有 lock_commit mutex.
if (change_stage(thd, Stage_manager::COMMIT_STAGE,
final_queue, leave_mutex_before_commit_stage,
&LOCK_commit))
{
DBUG_RETURN(finish_commit(thd));
}
// 固化 commit 佇列
THD *commit_queue = stage_manager.fetch_queue_for(Stage_manager::COMMIT_STAGE);
// 呼叫 after_sync hook
if (flush_error == 0 && sync_error == 0)
// 呼叫 after_sync hook.注意:對于after_sync, 這里將等待binlog dump 執行緒收到slave節點關于佇列中事務最新的 binlog_file和 binlog_pos的ACK,
sync_error = call_after_sync_hook(commit_queue);
/*
process_commit_stage_queue 將為佇列中每個 thd 持有的 GTID
呼叫 update_on_commit 或 update_on_rollback,
這樣做的目的是確保 gtid 按照順序添加到 GTIDs中, 避免出現不必要的間隙
如果我們只允許每個執行緒在完成提交時呼叫 update_on_commit, 則無法保證 GTID
順序, 并且 gtid_executed 之間可能出現空隙,發生這種情況, server必須從
Gtid_set 中添加和洗掉間隔, 添加或洗掉間隔需要一個互斥鎖, 這會降低性能,
*/
process_commit_stage_queue(thd, commit_queue);
// 退出 Lock_commit 鎖
mysql_mutex_unlock(&LOCK_commit);
/*
Process after_commit after LOCK_commit is released for avoiding
3-way deadlock among user thread, rotate thread and dump thread.
在 LOCK_commit 釋放之后處理 after_commit 來避免 user thread, rotate thread 和 dump thread的
3路死鎖,
*/
process_after_commit_stage_queue(thd, commit_queue);
final_queue = commit_queue;
}
else
{
// 釋放鎖, 呼叫 after_sync hook.
if (leave_mutex_before_commit_stage)
mysql_mutex_unlock(leave_mutex_before_commit_stage);
if (flush_error == 0 && sync_error == 0)
sync_error = call_after_sync_hook(final_queue);
}
/*
Handle sync error after we release all locks in order to avoid deadlocks
為了避免死鎖, 在釋放所有的 locks 之后處理sync error
*/
if (sync_error)
handle_binlog_flush_or_sync_error(thd, true /* need_lock_log */, NULL);
/* Commit done so signal all waiting threads
commit完成之后通知所有處于 wait 狀態的執行緒
*/
stage_manager.signal_done(final_queue);
/*
Finish the commit before executing a rotate, or run the risk of a
deadlock. We don't need the return value here since it is in
thd->commit_error, which is returned below.
在執行 rotate 之前完成commit, 否則可能出現死鎖,
*/
(void)finish_commit(thd);
/*
If we need to rotate, we do it without commit error.
Otherwise the thd->commit_error will be possibly reset.
rotate
*/
if (DBUG_EVALUATE_IF("force_rotate", 1, 0) ||
(do_rotate && thd->commit_error == THD::CE_NONE &&
!is_rotating_caused_by_incident))
{
/*
如果需要進行 binlog rotate, 則進行 rotate 操作,
*/
DEBUG_SYNC(thd, "ready_to_do_rotation");
bool check_purge = false;
mysql_mutex_lock(&LOCK_log);
/*
If rotate fails then depends on binlog_error_action variable
appropriate action will be taken inside rotate call.
*/
int error = rotate(false, &check_purge);
mysql_mutex_unlock(&LOCK_log);
if (error)
thd->commit_error = THD::CE_COMMIT_ERROR;
else if (check_purge)
// rotate判斷是否需要 expire binlog.
purge();
}
/*
flush or sync errors are handled above (using binlog_error_action).
Hence treat only COMMIT_ERRORs as errors.
*/
DBUG_RETURN(thd->commit_error == THD::CE_COMMIT_ERROR);
}
/**
Enter a stage of the ordered commit procedure.
進入有序提交階段,
Entering is stage is done by:
- Atomically enqueueing a queue of processes (which is just one for
the first phase).
將執行緒入隊,如果佇列是空的,那么執行緒是該階段的leader, 要處理整個佇列,
- If the queue was empty, the thread is the leader for that stage
and it should process the entire queue for that stage.
如果佇列不是空的, 執行緒將作為 follower 等待commit結束,
- If the queue was not empty, the thread is a follower and can go
waiting for the commit to finish.
The function will lock the stage mutex if it was designated the
leader for the phase.
*/
bool MYSQL_BIN_LOG::change_stage(THD *thd,
Stage_manager::StageID stage, THD *queue,
mysql_mutex_t *leave_mutex,
mysql_mutex_t *enter_mutex)
{
assert(0 <= stage && stage < Stage_manager::STAGE_COUNTER);
assert(enter_mutex);
assert(queue);
/*
一旦會話入隊, enroll_for 將釋放 leave_mutex
*/
// 當前執行緒非 leader 執行緒, 非 leader 執行緒將被阻塞, 直到 commit 階段被 leader 執行緒喚醒,
if (!stage_manager.enroll_for(stage, queue, leave_mutex))
{
assert(!thd_get_cache_mngr(thd)->dbug_any_finalized());
DBUG_RETURN(true);
}
/*
* 以下是 leader, 獲取 enter_mutex
*/
bool need_lock_enter_mutex =
!(is_rotating_caused_by_incident && enter_mutex == &LOCK_log);
if (need_lock_enter_mutex)
mysql_mutex_lock(enter_mutex);
else
mysql_mutex_assert_owner(enter_mutex);
DBUG_RETURN(false);
}
// 回傳是否是 leader 執行緒, 非 leader 執行緒將被阻塞, 直到 commit階段被 leader執行緒喚醒
bool Stage_manager::enroll_for(StageID stage, THD *thd, mysql_mutex_t *stage_mutex)
{
// 如果佇列是空的, 那么我們就是 leader
bool leader = m_queue[stage].append(thd);
#ifdef HAVE_REPLICATION
// 如果處于 flush 階段并且存在 commit_order
if (stage == FLUSH_STAGE && has_commit_order_manager(thd))
{
// slave worker執行緒
Slave_worker *worker = dynamic_cast<Slave_worker *>(thd->rli_slave);
// slave worker 執行緒的 commit_order_manager
Commit_order_manager *mngr = worker->get_commit_order_manager();
// 取消注冊事務, worker為執行該事務的執行緒
mngr->unregister_trx(worker);
}
#endif
/*
如果 state_mutex 是 Lock_log, 那么當 binlog rotate時不應該解決, 在 rotation 時應該保持這個鎖,
*/
bool need_unlock_stage_mutex =
!(mysql_bin_log.is_rotating_caused_by_incident &&
stage_mutex == mysql_bin_log.get_log_lock());
if (stage_mutex && need_unlock_stage_mutex)
mysql_mutex_unlock(stage_mutex);
/*
如果 queue不是empty, 那么我們是跟隨者并等待leader處理這個queue,
如果我們持有一個 mutex, 那必須在 sleep 之前釋放這個mutex
*/
// 非 leader 執行緒將被阻塞
if (!leader)
{
mysql_mutex_lock(&m_lock_done);
#ifndef NDEBUG
/*
Leader can be awaiting all-clear to preempt follower's execution.
With setting the status the follower ensures it won't execute anything
including thread-specific code.
*/
thd->get_transaction()->m_flags.ready_preempt = 1;
if (leader_await_preempt_status)
mysql_cond_signal(&m_cond_preempt);
#endif
while (thd->get_transaction()->m_flags.pending)
mysql_cond_wait(&m_cond_done, &m_lock_done);
mysql_mutex_unlock(&m_lock_done);
}
return leader;
}
/*
執行 flush 階段,
*/
int MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var,
bool *rotate_var,
THD **out_queue_var)
{
// 需要持有 LOCK_log 鎖
mysql_mutex_assert_owner(&LOCK_log);
/*
獲取整個 flush 佇列并清空flush佇列, 對 flush 佇列進行固化,同時便于下一批事務可以使用 flush 佇列,
*/
THD *first_seen = stage_manager.fetch_queue_for(Stage_manager::FLUSH_STAGE);
/*
在 innodb 存盤引擎中 flush redo log, 做 innodb 層 redo 持久化,
*/
ha_flush_logs(NULL, true);
// 為 flush 佇列中每個事務生成 gtid
assign_automatic_gtids_to_flush_group(first_seen);
/*
重繪每個執行緒的 binlog cache 到 binlog 檔案中,
*/
for (THD *head = first_seen; head; head = head->next_to_commit)
{
// 將執行緒的 binlog cache 重繪到 binlog 日志檔案
// 首先將 gtid event 直接寫入 binlog,
// 再將 other event 寫入到 binlog file cache 中
std::pair<int, my_off_t> result = flush_thread_caches(head);
total_bytes += result.second;
}
*out_queue_var = first_seen;
*total_bytes_var = total_bytes;
// 判斷 binlog 檔案是否需要切換
// 這里是在寫入之后判斷的, 因此一個事務的binlog都位于同一個檔案中
if (total_bytes > 0 && my_b_tell(&log_file) >= (my_off_t)max_size)
*rotate_var = true;
}
THD *fetch_queue_for(StageID stage)
{
return m_queue[stage].fetch_and_empty();
}
/** Flush InnoDB redo logs to the file system.
* flush innodb redo logs到檔案系統中,
如果在 flush 階段被 binlog 組提交呼叫, 則為 true, 其他情況為 false.
@return false */
static bool
innobase_flush_logs(
handlerton *hton,
bool binlog_group_flush)
{
DBUG_ENTER("innobase_flush_logs");
assert(hton == innodb_hton_ptr);
// read only, return false
if (srv_read_only_mode)
{
DBUG_RETURN(false);
}
// 如果 binlog_group_flush 為 true, 則我們在 flush 階段被 binlog 組提交呼叫, 否則為 false,
// innodb_flush_log_at_trx_commit = 0 的情況下, write & sync 每秒一次, 不需要在 binlog group commit時提交,
if (binlog_group_flush && srv_flush_log_at_trx_commit == 0)
{
/* innodb_flush_log_at_trx_commit=0
(write and sync once per second).
Do not flush the redo log during binlog group commit. */
DBUG_RETURN(false);
}
/* 將 redo log buffer 重繪到 redo log file檔案中,
如果在 flush logs 或者 srv_flush_log_at_trx_commit = 1則進行 flush,
*/
log_buffer_flush_to_disk(!binlog_group_flush || srv_flush_log_at_trx_commit == 1);
DBUG_RETURN(false);
}
/**
* 為 commit group 中每個事務生成 GTID
*/
bool MYSQL_BIN_LOG::assign_automatic_gtids_to_flush_group(THD *first_seen)
{
bool error = false;
bool is_global_sid_locked = false;
rpl_sidno locked_sidno = 0;
// 遍歷 flush 佇列中每個事務
for (THD *head = first_seen; head; head = head->next_to_commit)
{
/* Generate GTID
* 為每個事務生成 GTID
*/
if (head->variables.gtid_next.type == AUTOMATIC_GROUP)
{
if (!is_global_sid_locked)
{
global_sid_lock->rdlock();
is_global_sid_locked = true;
}
// 為指定的事務生成 GTID
if (gtid_state->generate_automatic_gtid(head,
head->get_transaction()->get_rpl_transaction_ctx()->get_sidno(),
head->get_transaction()->get_rpl_transaction_ctx()->get_gno(),
&locked_sidno) != RETURN_STATUS_OK)
}
else
{
if (head->variables.gtid_next.type == GTID_GROUP)
assert(head->owned_gtid.sidno > 0);
else
{
assert(head->variables.gtid_next.type == ANONYMOUS_GROUP);
assert(head->owned_gtid.sidno == THD::OWNED_SIDNO_ANONYMOUS);
}
}
}
DBUG_RETURN(error);
}
/**
Flush caches for session.
將執行緒的 binlog cache 重繪到 binlog 日志檔案
首先將 gtid event 直接寫入 binlog,
再將 other event 寫入到 binlog cache 中
set_trans_pos 是一個指向二進制日志當前使用的檔案名的指標呼叫, rotation 將改變這個變數的內容,
*/
std::pair<int, my_off_t>
MYSQL_BIN_LOG::flush_thread_caches(THD *thd)
{
// cache_mgr
binlog_cache_mngr *cache_mngr = thd_get_cache_mngr(thd);
my_off_t bytes = 0;
bool wrote_xid = false;
// binlog cache 進行 flush 操作,
// 首先將 gtid event 直接寫入 binlog,
// 再將 other event 寫入到 binlog cache 中
int error = cache_mngr->flush(thd, &bytes, &wrote_xid);
if (!error && bytes > 0)
{
thd->set_trans_pos(log_file_name, my_b_tell(&log_file));
if (wrote_xid)
inc_prep_xids(thd);
}
DBUG_PRINT("debug", ("bytes: %llu", bytes));
return std::make_pair(error, bytes);
}
/*
Convenience method to flush both caches to the binary log.
// 對 binlog cache 進行 flush 操作,
*/
int flush(THD *thd, my_off_t *bytes_written, bool *wrote_xid)
{
my_off_t stmt_bytes = 0;
my_off_t trx_bytes = 0;
assert(stmt_cache.has_xid() == 0);
// 對指定執行緒的 binlog cache 進行 flush 操作,
// 首先將 gtid event 直接寫入 binlog,
// 再將 other event 寫入到 binlog cache 中,
int error = stmt_cache.flush(thd, &stmt_bytes, wrote_xid);
if (error)
return error;
DEBUG_SYNC(thd, "after_flush_stm_cache_before_flush_trx_cache");
if (int error = trx_cache.flush(thd, &trx_bytes, wrote_xid))
return error;
*bytes_written = stmt_bytes + trx_bytes;
return 0;
}
/**
Flush caches to the binary log.
將快取 flush 到二進制日志檔案,
首先將 gtid event 直接寫入 binlog,
再將 other event 寫入到 binlog cache 中,
If the cache is finalized, the cache will be flushed to the binary
log file. If the cache is not finalized, nothing will be done.
如果 cache 確定, 則把 cache flush到二進制日志檔案,如果 cache 尚未最終確定, 則不做任何操作,
If flushing fails for any reason, an error will be reported and the
cache will be reset. Flushing can fail in two circumstances:
如果由于任何原因 flush 失敗, 則報告錯誤并重置快取,flush可能存在如下兩種方式的失敗:
- It was not possible to write the cache to the file. In this case,
it does not make sense to keep the cache.
無法將 cache 寫入檔案, 這種情況下, 保存快取是沒有意義的,
- The cache was successfully written to disk but post-flush actions
(such as binary log rotation) failed. In this case, the cache is
already written to disk and there is no reason to keep it.
cache已成功寫入磁盤, 但是之后的操作失敗, 例如 binlog rotation,這種情況下,
快取已經寫入了磁盤, 沒有必要保留他,
@see binlog_cache_data::finalize
*/
int binlog_cache_data::flush(THD *thd, my_off_t *bytes_written, bool *wrote_xid)
{
int error = 0;
if (flags.finalized)
{
// bytes_in_cache
my_off_t bytes_in_cache = my_b_tell(&cache_log);
// trx
Transaction_ctx *trn_ctx = thd->get_transaction();
// seq_no
trn_ctx->sequence_number = mysql_bin_log.m_dependency_tracker.step();
// last_committed
if (trn_ctx->last_committed == SEQ_UNINIT)
trn_ctx->last_committed = trn_ctx->sequence_number - 1;
/*
如果事務已經寫入陳述句快取, 則在flush陳述句快取之前寫入 GTID 資訊;
*/
Binlog_event_writer writer(mysql_bin_log.get_log_file());
/* The GTID ownership process might set the commit_error */
error = (thd->commit_error == THD::CE_FLUSH_ERROR ||
thd->commit_error == THD::CE_FLUSH_GNO_EXHAUSTED_ERROR);
if (!error)
// gtid 寫入 binlog 檔案, 這里直接寫入磁盤 binlog 檔案, 沒有寫入 binlog file cache
if ((error = mysql_bin_log.write_gtid(thd, this, &writer)))
thd->commit_error = THD::CE_FLUSH_ERROR;
if (!error)
// 將其他 event 寫入 binlog file cache
error = mysql_bin_log.write_cache(thd, this, &writer);
if (flags.with_xid && error == 0)
*wrote_xid = true;
reset();
if (bytes_written)
*bytes_written = bytes_in_cache;
}
assert(!flags.finalized);
DBUG_RETURN(error);
}
/**
Flush the I/O cache to file.
將 IO cache flush到檔案中,
如果寫入了任何位元組, 則將binlog cache flush到檔案,
如果 flush 成功, 則發出 flush 成功信號,
*/
int MYSQL_BIN_LOG::flush_cache_to_file(my_off_t *end_pos_var)
{
if (flush_io_cache(&log_file))
{
THD *thd = current_thd;
thd->commit_error = THD::CE_FLUSH_ERROR;
return ER_ERROR_ON_WRITE;
}
*end_pos_var = my_b_tell(&log_file);
return 0;
}
/**
* 更新 binlog end pos 點位, 通知 dump 執行緒發送 binlog
*/
void update_binlog_end_pos()
{
/*
binlog_end_pos 僅在 master binlog 上使用,
*/
if (is_relay_log)
signal_update();
else
{
// 持有 binlog end pos mutex.
lock_binlog_end_pos();
binlog_end_pos = my_b_tell(&log_file);
// 通知 dump 執行緒
signal_update();
// 釋放 binlog end pos mutex
unlock_binlog_end_pos();
}
}
void signal_update()
{
DBUG_ENTER("MYSQL_BIN_LOG::signal_update");
signal_cnt++;
mysql_cond_broadcast(&update_cond);
DBUG_VOID_RETURN;
}
/**
Call fsync() to sync the file to disk.
sync binlog file 到磁盤中,
根據 sync_binlog 的設定決定是否刷盤,
*/
std::pair<bool, bool>
MYSQL_BIN_LOG::sync_binlog_file(bool force)
{
bool synced = false;
unsigned int sync_period = get_sync_period();
if (force || (sync_period && ++sync_counter >= sync_period))
{
sync_counter = 0;
if (DBUG_EVALUATE_IF("simulate_error_during_sync_binlog_file", 1,
mysql_file_sync(log_file.file,
MYF(MY_WME | MY_IGNORE_BADFD))))
{
THD *thd = current_thd;
thd->commit_error = THD::CE_SYNC_ERROR;
return std::make_pair(true, synced);
}
synced = true;
}
return std::make_pair(false, synced);
}
/** Auxiliary function used in ordered_commit. 呼叫 after_sync hook. 增強半同步復制相關 after_sync 相關,
等待 dump 執行緒收到 slave 對于佇列中最大的 binlog_file 和 binlog_pos 的 ACK, */ static inline int call_after_sync_hook(THD *queue_head) { const char *log_file = NULL; my_off_t pos = 0; if (NO_HOOK(binlog_storage)) return 0; assert(queue_head != NULL); // 遍歷 queue 中的執行緒, 獲取到queue佇列中事務最大的 binlog file 和 pos for (THD *thd = queue_head; thd != NULL; thd = thd->next_to_commit) if (likely(thd->commit_error == THD::CE_NONE)) thd->get_trans_fixed_pos(&log_file, &pos); // 等待 dump 執行緒收到最大的 binlog file 和 pos 的 ACK, if (DBUG_EVALUATE_IF("simulate_after_sync_hook_error", 1, 0) || RUN_HOOK(binlog_storage, after_sync, (queue_head, log_file, pos))) { return ER_ERROR_ON_WRITE; } return 0; }
/**
提交一系列會話,
這個函式用于提交從 first 開始的佇列中的會話,
如果 ordered commit 的 flushing 階段出錯, 則會傳入err code, 并標記所有執行緒,
還會將事務的 GTID 添加到 gtid_executed 中,
*/
void MYSQL_BIN_LOG::process_commit_stage_queue(THD *thd, THD *first)
{
// 持有 lock_commit lock.
mysql_mutex_assert_owner(&LOCK_commit);
// 遍歷 commit 佇列中的執行緒
for (THD *head = first; head; head = head->next_to_commit)
{
/*
如果 flush 失敗, 則為 session 設定 commit_error, 跳過改事務并繼續下一個事務,
這會將所有的執行緒標記失敗, 因為 flush 失敗,
如果 flush 成功, 鏈接上session并在引擎中提交,
*/
// seq_no != 0
if (head->get_transaction()->sequence_number != SEQ_UNINIT)
{
// lock LOCK_slave_trans_dep_tracker
mysql_mutex_lock(&LOCK_slave_trans_dep_tracker);
// 更新提交的最大的 seqno, last_commit
m_dependency_tracker.update_max_committed(head);
mysql_mutex_unlock(&LOCK_slave_trans_dep_tracker);
}
/*
應忽略 flush/sync 錯誤并繼續提交階段,
因此此時 thd->commit_error 不能是 COMMIT_ERROR,
*/
// 是否是一個真正的提交
bool all = head->get_transaction()->m_flags.real_commit;
// 如果 commit_low
if (head->get_transaction()->m_flags.commit_low)
{
/*
storage engine commit
存盤引擎提交,
*/
if (ha_commit_low(head, all, false))
head->commit_error = THD::CE_COMMIT_ERROR;
}
}
/*
Handle the GTID of the threads.
gtid_executed table is kept updated even though transactions fail to be
logged. That's required by slave auto positioning.
處理各session的 GTID,
即使無法記錄事務, gtid_executed 表也會保持更新,這是 slave auto position 必須的,
*/
// 更新 gtid_executed
gtid_state->update_commit_group(first);
for (THD *head = first; head; head = head->next_to_commit)
{
/*
在存盤引擎提交后減少準備好的 XID 計數器,
當遇到 flush 錯誤或session轉儲存盤時, 還是需要減少準備好的 XID, 以避免用戶執行緒, rotate執行緒
和 dump執行緒之間的三向死鎖,
*/
if (head->get_transaction()->m_flags.xid_written)
// 減少 xid 計數器
dec_prep_xids(head);
}
}
/**
在 Innodb 中 commit 一個事務, 并標記 SQL 陳述句結束,*/
static int
innobase_commit(
/*============*/
handlerton *hton, /*!< in: InnoDB handlerton ; 存盤引擎 */
THD *thd, /*!< in: MySQL thread handle of the
user for whom the transaction should
be committed ; mysql thread headle */
bool commit_trx) /*!< in: true - commit transaction
false - the current SQL statement
ended ; true: 事務提交 false: 當前 SQL 陳述句結束 */
{
// 獲取 mysql handler 物件的 innodb trx, 如果相應的 MySQL執行緒結構缺乏一個則創建一個 innodb事務物件,
trx_t *trx = check_trx_exists(thd);
TrxInInnoDB trx_in_innodb(trx);
// 如果事務已標記異步回滾
if (trx_in_innodb.is_aborted())
{
// 進行回滾
innobase_rollback(hton, thd, commit_trx);
DBUG_RETURN(convert_error_code_to_mysql(
DB_FORCED_ABORT, 0, thd));
}
/*
事務僅在 commit 或者回滾時取消注冊,
如果取消注冊, 我們不能立即釋放資源, 我們可以立即回傳,
目前, 雖然沒有什么需要清理的, 但我們還是謹慎行事, 進行清理,
*/
if (!trx_is_registered_for_2pc(trx) && trx_is_started(trx))
{
}
// read_only
bool read_only = trx->read_only || trx->id == 0;
// 如果是 事務 commit
if (commit_trx || (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
{
/*
我們正在提交整個事務, 或者這是一個SQL陳述句結束并且 autocommit = on;
我們需要當前的 binlog 位置才能讓 mysqlbackup 作業,
*/
// 不是只讀事務
if (!read_only)
{
// 依據引數 innobase_commit_concurrency 來判斷是否有過多的執行緒同時提交,
while (innobase_commit_concurrency > 0)
{
// 獲取 commit_cond_m mutex
mysql_mutex_lock(&commit_cond_m);
// commit_threads + 1
++commit_threads;
// 正在提交的事務小于 innodb_commit_concurrency, 則釋放 commit_cond_m mutex, 進入提交流程
if (commit_threads <= innobase_commit_concurrency)
{
mysql_mutex_unlock(&commit_cond_m);
break;
}
--commit_threads;
mysql_cond_wait(&commit_cond, &commit_cond_m);
mysql_mutex_unlock(&commit_cond_m);
}
/*
下面的呼叫讀取正在提交的事務的 binlog pos.
其他引擎的二進制日志記錄和 Innodb 無關, 因為 Innodb 要求的是提交 Innodb 事務在 MySQL 二進制日志
中的順序和 Innodb 日志中的順序相同, 這是由 server 保證的,
如果沒有指定 binary log, 或者事務沒有寫 binlog, file name將是一個空指標,
*/
ulonglong pos;
// 獲取thd最新寫入的 binlog 的 pos
thd_binlog_pos(thd, &trx->mysql_log_file_name, &pos);
trx->mysql_log_offset = static_cast<int64_t>(pos);
/* Don't do write + flush right now. For group commit
to work we want to do the flush later.
現在不要進行 flush, 對于組提交, 我們希望之后再 flush,
*/
trx->flush_log_later = true;
}
// innobase commit, innodb層的提交
innobase_commit_low(trx);
// 非 read_only
if (!read_only)
{
trx->flush_log_later = false;
// 當前正在提交的執行緒 -1
if (innobase_commit_concurrency > 0)
{
mysql_mutex_lock(&commit_cond_m);
ut_ad(commit_threads > 0);
--commit_threads;
mysql_cond_signal(&commit_cond);
mysql_mutex_unlock(&commit_cond_m);
}
}
// trx提交完成, 取消注冊
trx_deregister_from_2pc(trx);
/* Now do a write + flush of logs.
現在 write/flush logs.
*/
if (!read_only)
{
// write/flush 指定 trx lsn前的redo log到磁盤中,
// 這里會再次 flush redo log,
trx_commit_complete_for_mysql(trx);
}
}
else
{
/*
如果是非事務提交
僅僅需要釋放指定SQL陳述句對應的表上的 auto-inc 鎖,
*/
if (!read_only)
{
lock_unlock_table_autoinc(trx);
}
/*
存盤當前事務的 undo_no, 以便如果需要roll back下一個SQL陳述句時直到rollback到哪里,
*/
trx_mark_sql_stat_end(trx);
}
/* Reset the number AUTO-INC rows required */
trx->n_autoinc_rows = 0;
/* This is a statement level variable. */
trx->fts_next_doc_id = 0;
// 強制一個執行緒 leave innodb, 即使他有多余的 tickets.
innobase_srv_conc_force_exit_innodb(trx);
DBUG_RETURN(0);
}
/**
Process after commit for a sequence of sessions.
處理 after_commit.
*/
void MYSQL_BIN_LOG::process_after_commit_stage_queue(THD *thd, THD *first)
{
// 遍歷 commit 佇列中的執行緒
for (THD *head = first; head; head = head->next_to_commit)
{
if (head->get_transaction()->m_flags.run_hooks &&
head->commit_error != THD::CE_COMMIT_ERROR)
{
/*
hook 可能會移動到 if 之外, 并且這可能是唯一的一處 after_commit 呼叫,
*/
bool all = head->get_transaction()->m_flags.real_commit;
(void)RUN_HOOK(transaction, after_commit, (head, all));
head->get_transaction()->m_flags.run_hooks = false;
}
}
}
/* finish commit **/
int MYSQL_BIN_LOG::finish_commit(THD *thd)
{
if (unlikely(!is_open()))
{
// 清空 binlog cache臨時檔案和記憶體
binlog_cache_mngr *cache_mngr = thd_get_cache_mngr(thd);
if (cache_mngr)
cache_mngr->reset();
}
if (thd->get_transaction()->sequence_number != SEQ_UNINIT)
{
mysql_mutex_lock(&LOCK_slave_trans_dep_tracker);
// 更新 max_commit
m_dependency_tracker.update_max_committed(thd);
mysql_mutex_unlock(&LOCK_slave_trans_dep_tracker);
}
if (thd->get_transaction()->m_flags.commit_low)
{
const bool all = thd->get_transaction()->m_flags.real_commit;
/*
這里, flush error 和 sync error 將被忽略
*/
assert(thd->commit_error != THD::CE_COMMIT_ERROR);
/*
storage engine commit
存盤引擎提交
*/
if (ha_commit_low(thd, all, false))
thd->commit_error = THD::CE_COMMIT_ERROR;
if (thd->get_transaction()->m_flags.xid_written)
dec_prep_xids(thd);
/*
after_commit hook
*/
if ((thd->commit_error != THD::CE_COMMIT_ERROR) &&
thd->get_transaction()->m_flags.run_hooks)
{
(void)RUN_HOOK(transaction, after_commit, (thd, all));
thd->get_transaction()->m_flags.run_hooks = false;
}
}
else if (thd->get_transaction()->m_flags.xid_written)
dec_prep_xids(thd);
if (!thd->owned_gtid.is_empty())
{
/*
更新 gtid_executed
*/
if (thd->commit_error == THD::CE_NONE)
{
gtid_state->update_on_commit(thd);
}
else
gtid_state->update_on_rollback(thd);
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/457566.html
標籤:MySQL
