主頁 > 資料庫 > MySQL事務提交流程詳解

MySQL事務提交流程詳解

2022-04-08 09:02:51 資料庫

MySQL事務的提交采用兩階段提交協議, 前些日子和同事聊的時候發現對提交的細節還是有些模糊,這里對照MySQL原始碼詳細記錄一下,版本是MySQL5.7.36,

一. 事務的提交流程,

 

1. 獲取 MDL_key::COMMIT 鎖: FTWRL會阻塞 commit 操作,
-------------------------------- 接下來進入 prepare 階段:
2. binlog prepare: 將上一次 commit 佇列中的最大的 seq_no 寫入本次事務的 last_commit 中,【用戶發起的顯式提交(顯式commit), 或者DDL發起的隱式提交沒有這一步】
3. innodb prepare:
3.1 獲取執行緒的 xid, 設定為事務的 xid.
3.2 修改Innodb中事務的狀態為 Prepare 狀態
3.3 將 undo 日志端從 active 設定為 prepare 狀態, 并在undo段的第一個undo的last undo log header中寫入xid.
-------------------------------- 接下來進入 commit 階段:
4. XID_Event生成并寫入binlog cache中,會首先將事務中的寫操作生成的event flush到binlog cache中, 再寫入 xid event,這也符合binlog中事務event順序,
5. binlog flush:
5.1. 當前執行緒加入flush佇列, 如果flush佇列是空的, 則當前為leader; 如果為非空, 則為 follower, 非leader執行緒將被阻塞, 直到commit之后被leader執行緒喚醒, 完成提交,
5.2. 獲取Lock log鎖
5.3. 對flush佇列進行fetch, 本次處理的佇列就固定了
5.4. 在innodb存盤引擎中flush redo log, 做 innodb層redo持久化
5.5. 為flush佇列中每個事務生成gtid
5.6. 將flush佇列中每個執行緒的binlog cache flush到 binlog檔案中, 這里包含三步:
a. 直接將 GTID event 寫入 binlog磁盤檔案中
b. 將事務生成的別的 event 寫入 binlog file cache中
c. 將 binlog cache[IO cache] flush到檔案
5.7. 判斷 binlog 是否需要切換, 設定切換標記,注意:這里是將事務的event寫入binlog file cache后再判斷, 因此一個事務的binlog都位于同一個binlog檔案中
5.8. after_flush hook:如果sync_binlog != 1, 那么在這里更新 binlog 點位, 通知dump執行緒向從節點發送 event
6. binlog sync:
6.1. 形成 sync 佇列, 第一個進入 sync 佇列的leader為本階段的laeder, 其他flush佇列加入sync佇列的leader執行緒將被阻塞, 直到commit階段被leader執行緒喚醒, finish_commit
6.2. 釋放 Lock log mutex, 獲取 Lock sync mutex
6.3. 根據 delay 的設定來決定是否延遲一段時間, 使得sync佇列變大, last commit是在binlog prepare時生成, 這時last commit尚未修改, 因此加入sync佇列的事務是同一組事務, 可以提高從庫 mts 效率,
6.4. fetch sync佇列, 對 sync 佇列進行固化
6.5. sync binlog file到磁盤中, 需要根據sync_binlog的設定來決定是否刷盤
6.6. 如果 sync_binlog = 1, 那么更新 binlog end pos, 通知 dump執行緒發送 event
7. commit階段:
如果需要按順序提交: order_commits:
7.1. sync佇列加入commit佇列, 第一個進入的 sync 佇列的leader為本階段的leader, 其他sync佇列加入commit佇列的leader會被阻塞, 直到 commit階段后被leader執行緒喚醒, 進入 finish commit
7.2. 釋放 Lock sync mutex,獲取 lock commit mutex
7.3. fetch commit佇列, 對 commit 佇列進行固化
7.4. 呼叫 after_sync hook: 這里, 如果半同步復制為 after_sync, 則需要等待dump執行緒收到從節點對于commit佇列中最大的binlog filename和 pos的ack,
7.5. 在 Innodb 層提交之前變更 last_commit, 將其變更為 commit 佇列中最大的 seqno
7.6. COMMIT佇列中每個事務按照順序進行存盤引擎層提交
7.7. 變更 gtid_executed
7.8. 釋放 lock commit mutex
如果不需要按順序提交:
7.1. 呼叫 after_sync hook: 這里, 如果半同步復制為 after_sync, 則需要等待dump執行緒收到從節點對于commit佇列中最大的binlog filename和 pos的ack,
8. 收尾:
8.1. leader 執行緒喚醒組內成員, 進行各自操作
8.2. commit 佇列中事務清空 binlog cache 臨時檔案和記憶體
8.3. 如果不需要按順序提交:則commit佇列中執行緒各自進行存盤引擎層的提交, 提交完成之后更新 gtid_executed
8.4. 決定是否進行 binlog 的 rotate
8.5. 如果 rotate 了 binlog, 則根據 expire_log_days 判斷是否需要清理 binlog

二. 流程代碼,

主代碼如下,洗掉了部分輔助代碼,從 trans_commit(THD *thd) 函式開始,

bool trans_commit(THD *thd)
{ // 提交事務, res = ha_commit_trans(thd, TRUE); if (res == FALSE) if (thd->rpl_thd_ctx.session_gtids_ctx().notify_after_transaction_commit(thd)) sql_print_warning("Failed to collect GTID to send in the response packet!"); thd->server_status &= ~SERVER_STATUS_IN_TRANS; thd->variables.option_bits &= ~OPTION_BEGIN; thd->get_transaction()->reset_unsafe_rollback_flags(Transaction_ctx::SESSION); thd->lex->start_transaction_opt = 0; /* The transaction should be marked as complete in P_S. */ assert(thd->m_transaction_psi == NULL); thd->tx_priority = 0; trans_track_end_trx(thd); DBUG_RETURN(MY_TEST(res)); }

  

/*    
    提交事務,
    server層最后呼叫函式 ha_commit_trans(), 該函式負責處理 binlog 層和存盤引擎層的提交,
*/
int ha_commit_trans(THD *thd, bool all, bool ignore_global_read_lock)
{
    // 讀寫事務 && 不能忽略全域讀鎖
    if (rw_trans && !ignore_global_read_lock)
    {
      /*
        獲取一個 MDL_KEY::COMMIT 元資料鎖, 該元資料鎖將確保 commit 操作會被活躍的 FTWRL 鎖阻止,
        FTWRL鎖會阻塞 COMMIT 操作,
      */
      MDL_REQUEST_INIT(&mdl_request,
                       MDL_key::COMMIT, "", "", MDL_INTENTION_EXCLUSIVE,
                       MDL_EXPLICIT);
      DBUG_PRINT("debug", ("Acquire MDL commit lock"));
      // 申請 MDL_key::COMMIT 鎖, 申請失敗
      if (thd->mdl_context.acquire_lock(&mdl_request,
                                        thd->variables.lock_wait_timeout))
      {
        ha_rollback_trans(thd, all);
        DBUG_RETURN(1);
      }
      release_mdl = true;
    }
    // 判斷是否開啟 xa 事務;
    // 所有的 entries 都支持 2pc && 在事務 scope 中設定做讀寫更改的引擎數量 > 1
    if (!trn_ctx->no_2pc(trx_scope) && (trn_ctx->rw_ha_count(trx_scope) > 1))
      // prepare; 在事務協調器中 prepare commit tx, 在引擎層生成一個 XA 事務,
      // tc_log: mysqld啟動時生成的 MySQL_BIN_LOG 物件[XA控制物件],
      error = tc_log->prepare(thd, all);
  }
  /*
    XA 事務的狀態變更為 prepared, 中間態,最侄訓變成常規的 NOTR 狀態,
  */
  if (!error && all && xid_state->has_state(XID_STATE::XA_IDLE))
  {
    assert(thd->lex->sql_command == SQLCOM_XA_COMMIT &&
           static_cast<Sql_cmd_xa_commit *>(thd->lex->m_sql_cmd)->get_xa_opt() == XA_ONE_PHASE);
    // 設定 XA 事務狀態為 XA_PREPARED 狀態,
    xid_state->set_state(XID_STATE::XA_PREPARED);
  }
  /**
   * XA 事務提交
  */
  if (error || (error = tc_log->commit(thd, all)))
  {
    ha_rollback_trans(thd, all);
    error = 1;
    goto end;
  }
end:
  // 釋放 mdl 鎖,
  if (release_mdl && mdl_request.ticket)
  {
    thd->mdl_context.release_lock(mdl_request.ticket);
  }
/* * 釋放資源并執行其他清理,空事務也需要, */ if (is_real_trans) { trn_ctx->cleanup(); thd->tx_priority = 0; } }

  

int MYSQL_BIN_LOG::prepare(THD *thd, bool all)
{
  /*
    設定 HA_IGNORE_DURABILITY 在 prepare 階段不將事務的 prepare record 刷到 innodb redo log,
    這樣在 binlog 組提交的 flush 階段 flushing binlog 之前 flush prepare record 到 innodb redo log,
    在 innodb prepare 時, 不刷 redo log.
  */
  thd->durability_property = HA_IGNORE_DURABILITY;
  //  在引擎中 prepare commit trx
  int error = ha_prepare_low(thd, all);
  DBUG_RETURN(error);
}

/**
 * prepare commit trx
 * 在引擎層 prepare commit trx
 * 包括 binlog引擎 和 innodb引擎
*/
int ha_prepare_low(THD *thd, bool all)
{
  // 遍歷引擎
  if (ha_info)
  {
    for (; ha_info && !error; ha_info = ha_info->next())
    {
      int err = 0;
      // 引擎
      handlerton *ht = ha_info->ht();
      /*
        如果這個特定事務是只讀的, 不要呼叫兩階段提交,
      */
      if (!ha_info->is_trx_read_write())
        continue;
      /**
       * 呼叫引擎的 prepare 在存盤層生成 XA 事務,
       * 先 binlog prepare, 再 innodb prepare; 
       * binlog prepare: 將上一次 commit 佇列中最大的 seq num 寫入本次事務的 last_commit 中
       * innodb prepare: 在 innodb 中更改 undo 日志段的狀態為 trx_undo_prepared, 并將 xid 寫入 undo log header,
       * */
      if ((err = ht->prepare(ht, thd, all)))
      {
        my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
        error = 1;
      }
      // ha_prepare_count++
      thd->status_var.ha_prepare_count++;
    }
  }
}

  

/*
 * binlog prepare;  
*/
static int binlog_prepare(handlerton *hton, THD *thd, bool all)
{
  if (!all)
  {
    // 將上一次 commit 佇列中最大的 seq number 寫入本次事務的 last_commit 中,
    thd->get_transaction()->store_commit_parent(mysql_bin_log.m_dependency_tracker.get_max_committed_timestamp());
  }
  DBUG_RETURN(all && is_loggable_xa_prepare(thd) ? mysql_bin_log.commit(thd, true) : 0);
}

/*******************************************************************/ /**
Innodb prepare 一個 X/Open XA 分布式事務,
static int
innobase_xa_prepare(
		/*================*/
		handlerton *hton, /*!< in: InnoDB handlerton ; innodb引擎 */
		THD *thd,					/*!< in: handle to the MySQL thread of
					the user whose XA transaction should
					be prepared ; mysql執行緒 */
		bool prepare_trx) /*!< in: true - prepare transaction
					false - the current SQL statement
					ended ; true: prepare 事務 
							false: 當前 SQL 陳述句結束, 陳述句級別的提交 */
{
	// trx
	trx_t *trx = check_trx_exists(thd);
	// 獲取thd的 xid, 同時設定到 trx -> xid 中
	thd_get_xid(thd, (MYSQL_XID *)trx->xid);

	/* 釋放可能的 FIFO ticket 和 search latch,
	因為我們要保留 trx_sys -> mutex, 我們必須首先釋放 search system latch 來遵守鎖存順序,
	*/
	trx_search_latch_release_if_reserved(trx);
	// prepare trx
	if (prepare_trx || (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
	{
		/* preapre 整個事務, 或者這是一個SQL陳述句結束, autocommit 是打開狀態 */
		// 事務已經在 mysql 2pc 協調器中注冊,
		ut_ad(trx_is_registered_for_2pc(trx));
		// trx prepare
		dberr_t err = trx_prepare_for_mysql(trx);
	}
	else
	{
		/* 陳述句的提交動作, 而非真正的事務提交, */
		// 需要釋放陳述句 hold 的 auto_increment 鎖
		lock_unlock_table_autoinc(trx);

		// 記錄本陳述句的 undo 資訊, 以便陳述句級的回滾
		// 標記最新SQL陳述句結束,
		trx_mark_sql_stat_end(trx);
	}
	return (0);
}

/**
 * trx prepare
*/
dberr_t
trx_prepare_for_mysql(trx_t *trx)
{
	trx->op_info = "preparing";
	// prepare trx.
	trx_prepare(trx);
}

/****************************************************************/ /**
prepare trx.*/
static void
trx_prepare(
		/*========*/
		trx_t *trx) /*!< in/out: transaction */
{
	// 回滾段 != NULL && redo 段被修改
	if (trx->rsegs.m_redo.rseg != NULL && trx_is_redo_rseg_updated(trx))
	{
		// 為指定的回滾段 preapre 一個事務,lsn 為當前已 commit 的 lsn
		lsn = trx_prepare_low(trx, &trx->rsegs.m_redo, false);
	}

	if (trx->rsegs.m_noredo.rseg != NULL && trx_is_noredo_rseg_updated(trx))
	{
		// 為指定的回滾段 preapre 一個事務,
		trx_prepare_low(trx, &trx->rsegs.m_noredo, true);
	}

	/*--------------------------------------*/
	// 事務狀態為 TRX_STATE_ACTIVE 狀態, 修改事務狀態
	trx->state = TRX_STATE_PREPARED;
	// 事務系統中處于 xa prepared 狀態的事務的數量
	trx_sys->n_prepared_trx++;
	/*--------------------------------------*/
	/* Release read locks after PREPARE for READ COMMITTED
	and lower isolation. 
	對 rc 隔離級別, 在 prepare 之后釋放 read locks, 降低隔離度
	*/
	if (trx->isolation_level <= TRX_ISO_READ_COMMITTED)
	{
		/* Stop inheriting GAP locks. 
		停止繼承 GAP lock,
		*/
		trx->skip_lock_inheritance = true;

		/* Release only GAP locks for now. 
		釋放 GAP lock,
		*/
		lock_trx_release_read_locks(trx, true);
	}
	switch (thd_requested_durability(trx->mysql_thd))
	{
	case HA_IGNORE_DURABILITY:
		/* 
		在 binlog group commit 的 prepare 階段, 我們設定 HA_IGNORE_DURABILITY , 這樣在這個階段不會 flush redo log,
		這樣我們就可以在 binlog group commit 的 flush 階段在將 binary log寫入二進制日志之前, 在一個組中 flush redo log,
		*/
		break;
	case ..
	}
}


/****************************************************************/ /**
為指定的回滾段 preapre 一個事務, */
static lsn_t
trx_prepare_low(
		/*============*/
		trx_t *trx,								/*!< in/out: transaction */
		trx_undo_ptr_t *undo_ptr, /*!< in/out: pointer to rollback
					segment scheduled for prepare. 指向回滾段的指標 */
		bool noredo_logging)			/*!< in: turn-off redo logging. 不需要redo log */
{
	lsn_t lsn;
	// insert 或者 undo 回滾段不為 NULL
	if (undo_ptr->insert_undo != NULL || undo_ptr->update_undo != NULL)
	{
		// start a sync mtr
		mtr_start_sync(&mtr);
		// 設定 mtr mode
		if (noredo_logging)
		{
			mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
		}

		/* 
		將  undo 日志段狀態從 trx_undo_active 修改為 trx_undo_prepared: 
		更改 undo 回滾段將其設定為 prepare 狀態,
		*/
		mutex_enter(&rseg->mutex);
		// insert undo log 不為 NULL
		if (undo_ptr->insert_undo != NULL)
		{
			/* 
			這里不需要獲取 trx->undo_mutex, 因為只允許一個 OS 執行緒為該事務做事務準備,
			*/
			// 將 undo 日志段狀態從 trx_undo_active 修改為 trx_undo_prepared 狀態
			trx_undo_set_state_at_prepare(
					trx, undo_ptr->insert_undo, false, &mtr);
		}
		// 將 undo 日志段狀態從 trx_undo_active 修改為 trx_undo_prepared 狀態
		if (undo_ptr->update_undo != NULL)
		{
			trx_undo_set_state_at_prepare(
					trx, undo_ptr->update_undo, false, &mtr);
		}

		mutex_exit(&rseg->mutex);
		lsn = mtr.commit_lsn();
	}
	else
	{
		lsn = 0;
	}
	return (lsn);
}

/* 修改 undo 日志段的狀態*/
page_t *
trx_undo_set_state_at_prepare(
		trx_t *trx,
		trx_undo_t *undo,
		bool rollback,
		mtr_t *mtr)
{
	// 獲取 undo page 頁, 并在其上加 x-latch
	undo_page = trx_undo_page_get(
			page_id_t(undo->space, undo->hdr_page_no),
			undo->page_size, mtr);
	// undo 段 header
	seg_hdr = undo_page + TRX_UNDO_SEG_HDR;
	// 如果是 XA rollback
	if (rollback)
	{
		ut_ad(undo->state == TRX_UNDO_PREPARED);
		// 將 undo 段的狀態從 TRX_UNDO_PREPARED 修改為 TRX_UNDO_ACTIVE 狀態
		mlog_write_ulint(seg_hdr + TRX_UNDO_STATE, TRX_UNDO_ACTIVE,
										 MLOG_2BYTES, mtr);
		return (undo_page);
	}
	/*------------------------------*/
	// 是 XA prepare, 則將 undo 段的狀態從 TRX_UNDO_ACTIVE 修改為 TRX_UNDO_PREPARED, 并將 xid 寫入 undo,
	ut_ad(undo->state == TRX_UNDO_ACTIVE);
	undo->state = TRX_UNDO_PREPARED;
	undo->xid = *trx->xid;
	/*------------------------------*/
	// 在 undo 段中更新當前 undo 段的狀態
	mlog_write_ulint(seg_hdr + TRX_UNDO_STATE, undo->state,
									 MLOG_2BYTES, mtr);
	// 在 undo 段 last undo log header 中寫入 xid
	offset = mach_read_from_2(seg_hdr + TRX_UNDO_LAST_LOG);
	undo_header = undo_page + offset;
	mlog_write_ulint(undo_header + TRX_UNDO_XID_EXISTS,
									 TRUE, MLOG_1BYTE, mtr);
	trx_undo_write_xid(undo_header, &undo->xid, mtr);
	return (undo_page);
}

  

/*
  在事務協調器中提交事務,
  該函式將在二進制日志和存盤引擎中提交事務,
*/
TC_LOG::enum_result MYSQL_BIN_LOG::commit(THD *thd, bool all)
{
    ........
    {
      /* The prepare phase of XA transaction two phase logging. */
      int err = 0;
      bool one_phase = get_xa_opt(thd) == XA_ONE_PHASE;

      assert(thd->lex->sql_command != SQLCOM_XA_COMMIT || one_phase);
      // xid event 生成并寫入 binlog cache, 在真正的寫操作陳述句生成的event之后
      XID_STATE *xs = thd->get_transaction()->xid_state();
      XA_prepare_log_event end_evt(thd, xs->get_xid(), one_phase);
      err = cache_mngr->trx_cache.finalize(thd, &end_evt, xs);
      ........
    }
    trx_stuff_logged = true;
  }
  
     ........
   
     
    // 提交,
    // ordered_commit: 事務在 binlog 階段提交的核心函式,
    if (ordered_commit(thd, all, skip_commit))
      DBUG_RETURN(RESULT_INCONSISTENT);

    /*
      Mark the flag m_is_binlogged to true only after we are done
      with checking all the error cases.
      檢查完所有錯誤情況后, 將標記 m_is_binlogged 標記為 true.
    */
    if (is_loggable_xa_prepare(thd))
      thd->get_transaction()->xid_state()->set_binlogged();
  }
  DBUG_RETURN(RESULT_SUCCESS);
}



/**
  Flush and commit the transaction.

  This will execute an ordered flush and commit of all outstanding
  transactions and is the main function for the binary log group
  commit logic. The function performs the ordered commit in two
  phases.

  The first phase flushes the caches to the binary log and under
  LOCK_log and marks all threads that were flushed as not pending.

  The second phase executes under LOCK_commit and commits all
  transactions in order.

  The procedure is:

  1. Queue ourselves for flushing.
  2. Grab the log lock, which might result is blocking if the mutex is
     already held by another thread.
  3. If we were not committed while waiting for the lock
     1. Fetch the queue
     2. For each thread in the queue:
        a. Attach to it
        b. Flush the caches, saving any error code
     3. Flush and sync (depending on the value of sync_binlog).
     4. Signal that the binary log was updated
  4. Release the log lock
  5. Grab the commit lock
     1. For each thread in the queue:
        a. If there were no error when flushing and the transaction shall be committed:
           - Commit the transaction, saving the result of executing the commit.
  6. Release the commit lock
  7. Call purge, if any of the committed thread requested a purge.
  8. Return with the saved error code
*/
int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit)
{
  /*
    Stage #1: flushing transactions to binary log
    階段1: 將事務 flush 到二進制日志
    While flushing, we allow new threads to enter and will process
    them in due time. Once the queue was empty, we cannot reap
    anything more since it is possible that a thread entered and
    appointed itself leader for the flush phase.
    在 flush 時, 允許新的執行緒進入, 并在適當的時間處理他們,
    一旦佇列變空, 我們就不能再識訓任何東西了, 因為可能有一個執行緒進入了佇列并
    指定自己為flush階段的 leader,
  */

#ifdef HAVE_REPLICATION
  /**
   * 先形成 flush 佇列, 非 leader 執行緒將被阻塞, 直到 commit 階段被 leader 執行緒喚醒,
   * 然后leader執行緒獲取 Lock log鎖
  */
  if (has_commit_order_manager(thd))
  {
    Slave_worker *worker = dynamic_cast<Slave_worker *>(thd->rli_slave);
    Commit_order_manager *mngr = worker->get_commit_order_manager();

    if (mngr->wait_for_its_turn(worker, all))
    {
      thd->commit_error = THD::CE_COMMIT_ERROR;
      DBUG_RETURN(thd->commit_error);
    }
    // 獲取 Lock_log 鎖, 非 leader 執行緒將被阻塞, 直到commit之后被 leader 執行緒喚醒, 非 leader 執行緒這里回傳 true, 執行緒應該等待提交完成,
    if (change_stage(thd, Stage_manager::FLUSH_STAGE, thd, NULL, &LOCK_log))
      DBUG_RETURN(finish_commit(thd));
  }
  else
#endif
      // 獲取 Lock_log 鎖, 非 leader 執行緒將被阻塞, 直到被 leader 執行緒喚醒, 非 leader 執行緒這里回傳 true, 執行緒應該等待提交完成,
      if (change_stage(thd, Stage_manager::FLUSH_STAGE, thd, NULL, &LOCK_log))
  {
    DBUG_RETURN(finish_commit(thd));
  }

  THD *wait_queue = NULL, *final_queue = NULL;
  mysql_mutex_t *leave_mutex_before_commit_stage = NULL;
  my_off_t flush_end_pos = 0;
  bool update_binlog_end_pos_after_sync;
  DEBUG_SYNC(thd, "waiting_in_the_middle_of_flush_stage");
  // 執行 flush 階段操作,
  /*
  * 1. 對 flush 佇列進行 fetch, 本次處理的flush佇列就固定了
    2. 在 innodb 存盤引擎中 flush redo log, 做 innodb 層 redo 持久化,
    3. 為 flush 佇列中每個事務生成 gtid,
    4. 將 flush佇列中每個執行緒的 binlog cache flush 到 binlog 日志檔案中,這里包含兩步:
            1. 將事務的 GTID event直接寫入 binlog 磁盤檔案中
            2. 將事務生成的別的 event 寫入 binlog file cache 中
  */
  flush_error = process_flush_stage_queue(&total_bytes, &do_rotate,
                                          &wait_queue);
  // 將 binary log cache(IO cache) flush到檔案中
  if (flush_error == 0 && total_bytes > 0)
    flush_error = flush_cache_to_file(&flush_end_pos);
  // sync_binlog 是否等于 1
  update_binlog_end_pos_after_sync = (get_sync_period() == 1);

  /*
    如果 flush 操作成功, 則呼叫 after_flush hook,
  */
  if (flush_error == 0)
  {
    const char *file_name_ptr = log_file_name + dirname_length(log_file_name);
    assert(flush_end_pos != 0);
    if (RUN_HOOK(binlog_storage, after_flush,
                 (thd, file_name_ptr, flush_end_pos)))
    {
      sql_print_error("Failed to run 'after_flush' hooks");
      flush_error = ER_ERROR_ON_WRITE;
    }
    // 不等于 1, 通知 dump 執行緒
    if (!update_binlog_end_pos_after_sync)
      // 更新 binlog end pos, 通知 dump 執行緒向從庫發送 event
      update_binlog_end_pos();
    DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE(););
  }

  if (flush_error)
  {
    /*
      Handle flush error (if any) after leader finishes it's flush stage.
      如果存在 flush 錯誤, 則處理 flush錯誤
    */
    handle_binlog_flush_or_sync_error(thd, false /* need_lock_log */,
                                      (thd->commit_error == THD::CE_FLUSH_GNO_EXHAUSTED_ERROR)
                                          ? ER(ER_GNO_EXHAUSTED)
                                          : NULL);
  }
  /*
    Stage #2: Syncing binary log file to disk
    sync binary log file to disk.
  */
  /** 釋放 Lock_log mutex, 獲取 Lock_sync mutex
   *  第一個進入的 flush 佇列的 leader 為本階段的 leader, 其他 flush 佇列加入 sync 佇列, 其他 flush 佇列的
   * leader會被阻塞, 直到 commit 階段被 leader 執行緒喚醒,
   * */
  if (change_stage(thd, Stage_manager::SYNC_STAGE, wait_queue, &LOCK_log, &LOCK_sync))
  {
    DBUG_RETURN(finish_commit(thd));
  }

  /*
    根據 delay 的設定來決定是否延遲一段時間, 如果 delay 的時間越久, 那么加入 sync 佇列的
    事務就越多【last commit 是在 binlog prepare 時生成的, 尚未更改, 因此加入 sync 佇列的
    事務是同一組事務】, 提高了從庫 mts 的效率,
  */
  if (!flush_error && (sync_counter + 1 >= get_sync_period()))
    stage_manager.wait_count_or_timeout(opt_binlog_group_commit_sync_no_delay_count,
                                        opt_binlog_group_commit_sync_delay,
                                        Stage_manager::SYNC_STAGE);
  // fetch sync 佇列, 對 sync 佇列進行固化,
  final_queue = stage_manager.fetch_queue_for(Stage_manager::SYNC_STAGE);
  // 這里 sync_binlog file到磁盤中
  if (flush_error == 0 && total_bytes > 0)
  {
    // 根據 sync_binlog 的設定決定是否刷盤
    std::pair<bool, bool> result = sync_binlog_file(false);
    sync_error = result.first;
  }
  // 在這里 sync_binlog = 1, 更新 binlog end_pos, 通知 dump 執行緒發送 event
  if (update_binlog_end_pos_after_sync)
  {
    THD *tmp_thd = final_queue;
    const char *binlog_file = NULL;
    my_off_t pos = 0;
    while (tmp_thd->next_to_commit != NULL)
      tmp_thd = tmp_thd->next_to_commit;
    if (flush_error == 0 && sync_error == 0)
    {
      tmp_thd->get_trans_fixed_pos(&binlog_file, &pos);
      // 更新 binlog end pos, 通知 dump 執行緒
      update_binlog_end_pos(binlog_file, pos);
    }
  }

  leave_mutex_before_commit_stage = &LOCK_sync;
  /*
    Stage #3: Commit all transactions in order.
    按順序在 Innodb 層提交所有事務,  
    如果我們不需要對提交順序進行排序, 并且每個執行緒必須執行 handlerton 提交, 那么這個階段可以跳過,
    然而, 由于我們保留了前一階段的鎖, 如果我們跳過這個階段, 則必須進行解鎖,
  */
commit_stage:
  // 如果需要順序提交
  if (opt_binlog_order_commits &&
      (sync_error == 0 || binlog_error_action != ABORT_SERVER))
  {
    // SYNC佇列加入 COMMIT 佇列, 第一個進入的 SYNC 佇列的 leader 為本階段的 leader,其他 sync 佇列
    // 加入 commit 佇列的 leade 會被阻塞, 直到 COMMIT 階段后被 leader 執行緒喚醒,
    // 釋放 lock_sync mutex, 持有 lock_commit mutex.
    if (change_stage(thd, Stage_manager::COMMIT_STAGE,
                     final_queue, leave_mutex_before_commit_stage,
                     &LOCK_commit))
    {
      DBUG_RETURN(finish_commit(thd));
    }
    // 固化 commit 佇列
    THD *commit_queue = stage_manager.fetch_queue_for(Stage_manager::COMMIT_STAGE);
    // 呼叫 after_sync hook
    if (flush_error == 0 && sync_error == 0)
      // 呼叫 after_sync hook.注意:對于after_sync, 這里將等待binlog dump 執行緒收到slave節點關于佇列中事務最新的 binlog_file和 binlog_pos的ACK,
      sync_error = call_after_sync_hook(commit_queue);

    /*
      process_commit_stage_queue 將為佇列中每個 thd 持有的 GTID 
      呼叫 update_on_commit 或 update_on_rollback,

      這樣做的目的是確保 gtid 按照順序添加到 GTIDs中, 避免出現不必要的間隙

      如果我們只允許每個執行緒在完成提交時呼叫 update_on_commit, 則無法保證 GTID
      順序, 并且 gtid_executed 之間可能出現空隙,發生這種情況, server必須從 
      Gtid_set 中添加和洗掉間隔, 添加或洗掉間隔需要一個互斥鎖, 這會降低性能,
    */
    process_commit_stage_queue(thd, commit_queue);
    // 退出 Lock_commit 鎖
    mysql_mutex_unlock(&LOCK_commit);
    /*
      Process after_commit after LOCK_commit is released for avoiding
      3-way deadlock among user thread, rotate thread and dump thread.
      在 LOCK_commit 釋放之后處理 after_commit 來避免 user thread, rotate thread 和 dump thread的
      3路死鎖,
    */
    process_after_commit_stage_queue(thd, commit_queue);
    final_queue = commit_queue;
  }
  else
  {
    // 釋放鎖, 呼叫 after_sync hook.
    if (leave_mutex_before_commit_stage)
      mysql_mutex_unlock(leave_mutex_before_commit_stage);
    if (flush_error == 0 && sync_error == 0)
      sync_error = call_after_sync_hook(final_queue);
  }

  /*
    Handle sync error after we release all locks in order to avoid deadlocks
    為了避免死鎖, 在釋放所有的 locks 之后處理sync error
  */
  if (sync_error)
    handle_binlog_flush_or_sync_error(thd, true /* need_lock_log */, NULL);

  /* Commit done so signal all waiting threads 
      commit完成之后通知所有處于 wait 狀態的執行緒
  */
  stage_manager.signal_done(final_queue);

  /*
    Finish the commit before executing a rotate, or run the risk of a
    deadlock. We don't need the return value here since it is in
    thd->commit_error, which is returned below.
    在執行 rotate 之前完成commit, 否則可能出現死鎖,
  */
  (void)finish_commit(thd);

  /*
    If we need to rotate, we do it without commit error.
    Otherwise the thd->commit_error will be possibly reset.
    rotate
   */
  if (DBUG_EVALUATE_IF("force_rotate", 1, 0) ||
      (do_rotate && thd->commit_error == THD::CE_NONE &&
       !is_rotating_caused_by_incident))
  {
    /*
    如果需要進行 binlog rotate, 則進行 rotate 操作,
    */

    DEBUG_SYNC(thd, "ready_to_do_rotation");
    bool check_purge = false;
    mysql_mutex_lock(&LOCK_log);
    /*
      If rotate fails then depends on binlog_error_action variable
      appropriate action will be taken inside rotate call.
    */
    int error = rotate(false, &check_purge);
    mysql_mutex_unlock(&LOCK_log);

    if (error)
      thd->commit_error = THD::CE_COMMIT_ERROR;
    else if (check_purge)
      // rotate判斷是否需要 expire binlog.
      purge();
  }
  /*
    flush or sync errors are handled above (using binlog_error_action).
    Hence treat only COMMIT_ERRORs as errors.
  */
  DBUG_RETURN(thd->commit_error == THD::CE_COMMIT_ERROR);
}
 

  

/**
  Enter a stage of the ordered commit procedure.
  進入有序提交階段,
  Entering is stage is done by:
  - Atomically enqueueing a queue of processes (which is just one for
    the first phase).
  將執行緒入隊,如果佇列是空的,那么執行緒是該階段的leader, 要處理整個佇列,
  - If the queue was empty, the thread is the leader for that stage
    and it should process the entire queue for that stage.
  如果佇列不是空的, 執行緒將作為 follower 等待commit結束,
  - If the queue was not empty, the thread is a follower and can go
    waiting for the commit to finish.
  The function will lock the stage mutex if it was designated the
  leader for the phase.
*/

bool MYSQL_BIN_LOG::change_stage(THD *thd,
                                 Stage_manager::StageID stage, THD *queue,
                                 mysql_mutex_t *leave_mutex,
                                 mysql_mutex_t *enter_mutex)
{
  assert(0 <= stage && stage < Stage_manager::STAGE_COUNTER);
  assert(enter_mutex);
  assert(queue);
  /*
    一旦會話入隊, enroll_for 將釋放 leave_mutex
  */
  // 當前執行緒非 leader 執行緒, 非 leader 執行緒將被阻塞, 直到 commit 階段被 leader 執行緒喚醒,
  if (!stage_manager.enroll_for(stage, queue, leave_mutex))
  {
    assert(!thd_get_cache_mngr(thd)->dbug_any_finalized());
    DBUG_RETURN(true);
  }
  /*
   * 以下是 leader, 獲取  enter_mutex
  */
  bool need_lock_enter_mutex =
      !(is_rotating_caused_by_incident && enter_mutex == &LOCK_log);

  if (need_lock_enter_mutex)
    mysql_mutex_lock(enter_mutex);
  else
    mysql_mutex_assert_owner(enter_mutex);

  DBUG_RETURN(false);
}

 

// 回傳是否是 leader 執行緒, 非 leader 執行緒將被阻塞, 直到 commit階段被 leader執行緒喚醒
bool Stage_manager::enroll_for(StageID stage, THD *thd, mysql_mutex_t *stage_mutex)
{
  // 如果佇列是空的, 那么我們就是 leader
  bool leader = m_queue[stage].append(thd);

#ifdef HAVE_REPLICATION
  // 如果處于 flush 階段并且存在 commit_order
  if (stage == FLUSH_STAGE && has_commit_order_manager(thd))
  {
    // slave worker執行緒
    Slave_worker *worker = dynamic_cast<Slave_worker *>(thd->rli_slave);
    // slave worker 執行緒的 commit_order_manager
    Commit_order_manager *mngr = worker->get_commit_order_manager();
    // 取消注冊事務, worker為執行該事務的執行緒
    mngr->unregister_trx(worker);
  }
#endif
  /*
    如果 state_mutex 是 Lock_log, 那么當 binlog rotate時不應該解決, 在 rotation 時應該保持這個鎖,
  */
  bool need_unlock_stage_mutex =
      !(mysql_bin_log.is_rotating_caused_by_incident &&
        stage_mutex == mysql_bin_log.get_log_lock());

  if (stage_mutex && need_unlock_stage_mutex)
    mysql_mutex_unlock(stage_mutex);

  /*
    如果 queue不是empty, 那么我們是跟隨者并等待leader處理這個queue,
    如果我們持有一個 mutex, 那必須在 sleep 之前釋放這個mutex
  */
  // 非 leader 執行緒將被阻塞
  if (!leader)
  {
    mysql_mutex_lock(&m_lock_done);
#ifndef NDEBUG
    /*
      Leader can be awaiting all-clear to preempt follower's execution.
      With setting the status the follower ensures it won't execute anything
      including thread-specific code.
    */
    thd->get_transaction()->m_flags.ready_preempt = 1;
    if (leader_await_preempt_status)
      mysql_cond_signal(&m_cond_preempt);
#endif
    while (thd->get_transaction()->m_flags.pending)
      mysql_cond_wait(&m_cond_done, &m_lock_done);
    mysql_mutex_unlock(&m_lock_done);
  }
  return leader;
}

  

/*
 執行 flush 階段,
*/
int MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var,
                                             bool *rotate_var,
                                             THD **out_queue_var)
{
  // 需要持有 LOCK_log 鎖
  mysql_mutex_assert_owner(&LOCK_log);

  /*
    獲取整個 flush 佇列并清空flush佇列, 對 flush 佇列進行固化,同時便于下一批事務可以使用 flush 佇列,
  */
  THD *first_seen = stage_manager.fetch_queue_for(Stage_manager::FLUSH_STAGE);
  /*
    在 innodb 存盤引擎中 flush redo log, 做 innodb 層 redo 持久化,
  */
  ha_flush_logs(NULL, true);
  // 為 flush 佇列中每個事務生成 gtid
  assign_automatic_gtids_to_flush_group(first_seen);
  /* 
  重繪每個執行緒的 binlog cache 到 binlog 檔案中,
  */
  for (THD *head = first_seen; head; head = head->next_to_commit)
  {
    // 將執行緒的 binlog cache 重繪到 binlog 日志檔案
    // 首先將 gtid event 直接寫入 binlog,
    //  再將 other event 寫入到 binlog file cache 中
    std::pair<int, my_off_t> result = flush_thread_caches(head);
    total_bytes += result.second;
  }

  *out_queue_var = first_seen;
  *total_bytes_var = total_bytes;
  // 判斷 binlog 檔案是否需要切換
  // 這里是在寫入之后判斷的, 因此一個事務的binlog都位于同一個檔案中
  if (total_bytes > 0 && my_b_tell(&log_file) >= (my_off_t)max_size)
    *rotate_var = true;
}

  

THD *fetch_queue_for(StageID stage)
{
    return m_queue[stage].fetch_and_empty();
}
/** Flush InnoDB redo logs to the file system.
 *  flush innodb redo logs到檔案系統中,
如果在 flush 階段被 binlog 組提交呼叫, 則為 true, 其他情況為 false.
@return false */
static bool
innobase_flush_logs(
		handlerton *hton,
		bool binlog_group_flush)
{
	DBUG_ENTER("innobase_flush_logs");
	assert(hton == innodb_hton_ptr);
	// read only, return false
	if (srv_read_only_mode)
	{
		DBUG_RETURN(false);
	}

	// 如果 binlog_group_flush 為 true, 則我們在 flush 階段被 binlog 組提交呼叫, 否則為 false,
	// innodb_flush_log_at_trx_commit = 0 的情況下, write & sync 每秒一次, 不需要在 binlog group commit時提交,
	if (binlog_group_flush && srv_flush_log_at_trx_commit == 0)
	{
		/* innodb_flush_log_at_trx_commit=0
		(write and sync once per second).
		Do not flush the redo log during binlog group commit. */
		DBUG_RETURN(false);
	}

	/* 將 redo log buffer 重繪到 redo log file檔案中,
	如果在 flush logs 或者 srv_flush_log_at_trx_commit = 1則進行 flush,
	*/
	log_buffer_flush_to_disk(!binlog_group_flush || srv_flush_log_at_trx_commit == 1);

	DBUG_RETURN(false);
}

  

/**
 * 為 commit group 中每個事務生成 GTID
*/
bool MYSQL_BIN_LOG::assign_automatic_gtids_to_flush_group(THD *first_seen)
{
  bool error = false;
  bool is_global_sid_locked = false;
  rpl_sidno locked_sidno = 0;
  // 遍歷 flush 佇列中每個事務
  for (THD *head = first_seen; head; head = head->next_to_commit)
  {
    /* Generate GTID 
     * 為每個事務生成 GTID
    */
    if (head->variables.gtid_next.type == AUTOMATIC_GROUP)
    {
      if (!is_global_sid_locked)
      {
        global_sid_lock->rdlock();
        is_global_sid_locked = true;
      }
      // 為指定的事務生成 GTID
      if (gtid_state->generate_automatic_gtid(head,
                                              head->get_transaction()->get_rpl_transaction_ctx()->get_sidno(),
                                              head->get_transaction()->get_rpl_transaction_ctx()->get_gno(),
                                              &locked_sidno) != RETURN_STATUS_OK)
    }
    else
    {
      if (head->variables.gtid_next.type == GTID_GROUP)
        assert(head->owned_gtid.sidno > 0);
      else
      {
        assert(head->variables.gtid_next.type == ANONYMOUS_GROUP);
        assert(head->owned_gtid.sidno == THD::OWNED_SIDNO_ANONYMOUS);
      }
    }
  }
  DBUG_RETURN(error);
}

/**
   Flush caches for session.
   將執行緒的 binlog cache 重繪到 binlog 日志檔案
   首先將 gtid event 直接寫入 binlog,
   再將 other event 寫入到 binlog cache 中

   set_trans_pos 是一個指向二進制日志當前使用的檔案名的指標呼叫, rotation 將改變這個變數的內容,
 */
std::pair<int, my_off_t>
MYSQL_BIN_LOG::flush_thread_caches(THD *thd)
{
  // cache_mgr
  binlog_cache_mngr *cache_mngr = thd_get_cache_mngr(thd);
  my_off_t bytes = 0;
  bool wrote_xid = false;
  // binlog cache 進行 flush 操作,
  //   首先將 gtid event 直接寫入 binlog,
  //   再將 other event 寫入到 binlog cache 中
  int error = cache_mngr->flush(thd, &bytes, &wrote_xid);
  if (!error && bytes > 0)
  {
    thd->set_trans_pos(log_file_name, my_b_tell(&log_file));
    if (wrote_xid)
      inc_prep_xids(thd);
  }
  DBUG_PRINT("debug", ("bytes: %llu", bytes));
  return std::make_pair(error, bytes);
}

/*
Convenience method to flush both caches to the binary log.
// 對 binlog cache 進行 flush 操作,
*/
int flush(THD *thd, my_off_t *bytes_written, bool *wrote_xid)
{
	my_off_t stmt_bytes = 0;
	my_off_t trx_bytes = 0;
	assert(stmt_cache.has_xid() == 0);
	// 對指定執行緒的 binlog cache 進行 flush 操作,
	//   首先將 gtid event 直接寫入 binlog,
	//   再將 other event 寫入到 binlog cache 中,
	int error = stmt_cache.flush(thd, &stmt_bytes, wrote_xid);
	if (error)
	  return error;
	DEBUG_SYNC(thd, "after_flush_stm_cache_before_flush_trx_cache");
	if (int error = trx_cache.flush(thd, &trx_bytes, wrote_xid))
	  return error;
	*bytes_written = stmt_bytes + trx_bytes;
	return 0;
}

/**
  Flush caches to the binary log.
  將快取 flush 到二進制日志檔案,
  首先將 gtid event 直接寫入 binlog,
  再將 other event 寫入到 binlog cache 中,

  If the cache is finalized, the cache will be flushed to the binary
  log file. If the cache is not finalized, nothing will be done.
  如果 cache 確定, 則把 cache flush到二進制日志檔案,如果 cache 尚未最終確定, 則不做任何操作,

  If flushing fails for any reason, an error will be reported and the
  cache will be reset. Flushing can fail in two circumstances:
  如果由于任何原因 flush 失敗, 則報告錯誤并重置快取,flush可能存在如下兩種方式的失敗:

  - It was not possible to write the cache to the file. In this case,
    it does not make sense to keep the cache.
    無法將 cache 寫入檔案, 這種情況下, 保存快取是沒有意義的,

  - The cache was successfully written to disk but post-flush actions
    (such as binary log rotation) failed. In this case, the cache is
    already written to disk and there is no reason to keep it.
    cache已成功寫入磁盤, 但是之后的操作失敗, 例如 binlog rotation,這種情況下,
    快取已經寫入了磁盤, 沒有必要保留他,
  @see binlog_cache_data::finalize
 */
int binlog_cache_data::flush(THD *thd, my_off_t *bytes_written, bool *wrote_xid)
{
  int error = 0;
  if (flags.finalized)
  {
    // bytes_in_cache
    my_off_t bytes_in_cache = my_b_tell(&cache_log);
    // trx
    Transaction_ctx *trn_ctx = thd->get_transaction();
    // seq_no
    trn_ctx->sequence_number = mysql_bin_log.m_dependency_tracker.step();
    // last_committed
    if (trn_ctx->last_committed == SEQ_UNINIT)
      trn_ctx->last_committed = trn_ctx->sequence_number - 1;

    /*
      如果事務已經寫入陳述句快取, 則在flush陳述句快取之前寫入 GTID 資訊; 
    */
    Binlog_event_writer writer(mysql_bin_log.get_log_file());

    /* The GTID ownership process might set the commit_error */
    error = (thd->commit_error == THD::CE_FLUSH_ERROR ||
             thd->commit_error == THD::CE_FLUSH_GNO_EXHAUSTED_ERROR);

    if (!error)
      // gtid 寫入 binlog 檔案, 這里直接寫入磁盤 binlog 檔案, 沒有寫入 binlog file cache
      if ((error = mysql_bin_log.write_gtid(thd, this, &writer)))
        thd->commit_error = THD::CE_FLUSH_ERROR;
    if (!error)
      // 將其他 event 寫入 binlog file cache
      error = mysql_bin_log.write_cache(thd, this, &writer);

    if (flags.with_xid && error == 0)
      *wrote_xid = true;

    reset();
    if (bytes_written)
      *bytes_written = bytes_in_cache;
  }
  assert(!flags.finalized);
  DBUG_RETURN(error);
}
/**
  Flush the I/O cache to file.
  將 IO cache flush到檔案中,

  如果寫入了任何位元組, 則將binlog cache flush到檔案,
  如果 flush 成功, 則發出 flush 成功信號,
*/
int MYSQL_BIN_LOG::flush_cache_to_file(my_off_t *end_pos_var)
{
  if (flush_io_cache(&log_file))
  {
    THD *thd = current_thd;
    thd->commit_error = THD::CE_FLUSH_ERROR;
    return ER_ERROR_ON_WRITE;
  }
  *end_pos_var = my_b_tell(&log_file);
  return 0;
}
/**
   * 更新 binlog end pos 點位, 通知 dump 執行緒發送 binlog
  */
  void update_binlog_end_pos()
  {
    /*
      binlog_end_pos 僅在 master binlog 上使用,
    */
    if (is_relay_log)
      signal_update();
    else
    {
      // 持有 binlog end pos mutex.
      lock_binlog_end_pos();
      binlog_end_pos = my_b_tell(&log_file);
      // 通知 dump 執行緒
      signal_update();
      // 釋放 binlog end pos mutex
      unlock_binlog_end_pos();
    }
  }
  void signal_update()
  {
    DBUG_ENTER("MYSQL_BIN_LOG::signal_update");
    signal_cnt++;
    mysql_cond_broadcast(&update_cond);
    DBUG_VOID_RETURN;
  }

 

/**
  Call fsync() to sync the file to disk.
  sync binlog file 到磁盤中,
  根據 sync_binlog 的設定決定是否刷盤,
*/
std::pair<bool, bool>
MYSQL_BIN_LOG::sync_binlog_file(bool force)
{
  bool synced = false;
  unsigned int sync_period = get_sync_period();
  if (force || (sync_period && ++sync_counter >= sync_period))
  {
    sync_counter = 0;
    if (DBUG_EVALUATE_IF("simulate_error_during_sync_binlog_file", 1,
                         mysql_file_sync(log_file.file,
                                         MYF(MY_WME | MY_IGNORE_BADFD))))
    {
      THD *thd = current_thd;
      thd->commit_error = THD::CE_SYNC_ERROR;
      return std::make_pair(true, synced);
    }
    synced = true;
  }
  return std::make_pair(false, synced);
}

  

/**
   Auxiliary function used in ordered_commit.
   呼叫 after_sync hook. 增強半同步復制相關 after_sync 相關,
等待 dump 執行緒收到 slave 對于佇列中最大的 binlog_file 和 binlog_pos 的 ACK, */ static inline int call_after_sync_hook(THD *queue_head) { const char *log_file = NULL; my_off_t pos = 0; if (NO_HOOK(binlog_storage)) return 0; assert(queue_head != NULL); // 遍歷 queue 中的執行緒, 獲取到queue佇列中事務最大的 binlog file 和 pos for (THD *thd = queue_head; thd != NULL; thd = thd->next_to_commit) if (likely(thd->commit_error == THD::CE_NONE)) thd->get_trans_fixed_pos(&log_file, &pos); // 等待 dump 執行緒收到最大的 binlog file 和 pos 的 ACK, if (DBUG_EVALUATE_IF("simulate_after_sync_hook_error", 1, 0) || RUN_HOOK(binlog_storage, after_sync, (queue_head, log_file, pos))) { return ER_ERROR_ON_WRITE; } return 0; }

  

/**
  提交一系列會話,
  這個函式用于提交從 first 開始的佇列中的會話,
  如果 ordered commit 的 flushing 階段出錯, 則會傳入err code, 并標記所有執行緒,
  還會將事務的 GTID 添加到 gtid_executed 中,
 */
void MYSQL_BIN_LOG::process_commit_stage_queue(THD *thd, THD *first)
{
  // 持有 lock_commit lock.
  mysql_mutex_assert_owner(&LOCK_commit);
  // 遍歷 commit 佇列中的執行緒
  for (THD *head = first; head; head = head->next_to_commit)
  {
    /*
      如果 flush 失敗, 則為 session 設定 commit_error, 跳過改事務并繼續下一個事務,
      這會將所有的執行緒標記失敗, 因為 flush 失敗,
      如果 flush 成功, 鏈接上session并在引擎中提交,
    */
    // seq_no != 0
    if (head->get_transaction()->sequence_number != SEQ_UNINIT)
    {
      // lock LOCK_slave_trans_dep_tracker
      mysql_mutex_lock(&LOCK_slave_trans_dep_tracker);
      // 更新提交的最大的 seqno, last_commit
      m_dependency_tracker.update_max_committed(head);
      mysql_mutex_unlock(&LOCK_slave_trans_dep_tracker);
    }
    /*
      應忽略 flush/sync 錯誤并繼續提交階段,
      因此此時 thd->commit_error 不能是 COMMIT_ERROR,
    */

    // 是否是一個真正的提交
    bool all = head->get_transaction()->m_flags.real_commit;
    // 如果 commit_low
    if (head->get_transaction()->m_flags.commit_low)
    {
      /*
        storage engine commit
        存盤引擎提交,
       */
      if (ha_commit_low(head, all, false))
        head->commit_error = THD::CE_COMMIT_ERROR;
    }
  }

  /*
    Handle the GTID of the threads.
    gtid_executed table is kept updated even though transactions fail to be
    logged. That's required by slave auto positioning.
    處理各session的 GTID,
    即使無法記錄事務, gtid_executed 表也會保持更新,這是 slave auto position 必須的,
  */
  // 更新 gtid_executed
  gtid_state->update_commit_group(first);

  for (THD *head = first; head; head = head->next_to_commit)
  {
    /*
      在存盤引擎提交后減少準備好的 XID 計數器,
      當遇到 flush 錯誤或session轉儲存盤時, 還是需要減少準備好的 XID, 以避免用戶執行緒, rotate執行緒
      和 dump執行緒之間的三向死鎖,
    */
    if (head->get_transaction()->m_flags.xid_written)
      // 減少 xid 計數器
      dec_prep_xids(head);
  }
}

  

/**
在 Innodb 中 commit 一個事務, 并標記 SQL 陳述句結束,*/
static int
innobase_commit(
		/*============*/
		handlerton *hton, /*!< in: InnoDB handlerton ; 存盤引擎 */
		THD *thd,					/*!< in: MySQL thread handle of the
					user for whom the transaction should
					be committed ; mysql thread headle  */
		bool commit_trx)	/*!< in: true - commit transaction
					false - the current SQL statement
					ended ; true: 事務提交 false: 當前 SQL 陳述句結束  */
{
	// 獲取 mysql handler 物件的 innodb trx, 如果相應的 MySQL執行緒結構缺乏一個則創建一個 innodb事務物件,
	trx_t *trx = check_trx_exists(thd);
	TrxInInnoDB trx_in_innodb(trx);
	// 如果事務已標記異步回滾
	if (trx_in_innodb.is_aborted())
	{
		// 進行回滾
		innobase_rollback(hton, thd, commit_trx);
		DBUG_RETURN(convert_error_code_to_mysql(
				DB_FORCED_ABORT, 0, thd));
	}
	/* 
	事務僅在 commit 或者回滾時取消注冊,
	如果取消注冊, 我們不能立即釋放資源, 我們可以立即回傳,
	目前, 雖然沒有什么需要清理的, 但我們還是謹慎行事, 進行清理,
	*/
	if (!trx_is_registered_for_2pc(trx) && trx_is_started(trx))
	{
	}
	// read_only
	bool read_only = trx->read_only || trx->id == 0;
	// 如果是 事務 commit
	if (commit_trx || (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
	{
		/*  
		我們正在提交整個事務, 或者這是一個SQL陳述句結束并且 autocommit = on;
		我們需要當前的 binlog 位置才能讓 mysqlbackup 作業,
		*/
		// 不是只讀事務
		if (!read_only)
		{
			// 依據引數 innobase_commit_concurrency 來判斷是否有過多的執行緒同時提交,
			while (innobase_commit_concurrency > 0)
			{
				// 獲取 commit_cond_m mutex
				mysql_mutex_lock(&commit_cond_m);
				//  commit_threads + 1
				++commit_threads;
				// 正在提交的事務小于 innodb_commit_concurrency, 則釋放 commit_cond_m mutex, 進入提交流程
				if (commit_threads <= innobase_commit_concurrency)
				{
					mysql_mutex_unlock(&commit_cond_m);
					break;
				}
				--commit_threads;
				mysql_cond_wait(&commit_cond, &commit_cond_m);
				mysql_mutex_unlock(&commit_cond_m);
			}

			/*
			下面的呼叫讀取正在提交的事務的 binlog pos.
			其他引擎的二進制日志記錄和 Innodb 無關, 因為 Innodb 要求的是提交 Innodb 事務在 MySQL 二進制日志
			中的順序和 Innodb 日志中的順序相同, 這是由 server 保證的,
			如果沒有指定  binary log, 或者事務沒有寫 binlog, file name將是一個空指標,
			*/
			ulonglong pos;
			// 獲取thd最新寫入的 binlog 的 pos
			thd_binlog_pos(thd, &trx->mysql_log_file_name, &pos);
			trx->mysql_log_offset = static_cast<int64_t>(pos);
			/* Don't do write + flush right now. For group commit
			to work we want to do the flush later. 
			現在不要進行 flush, 對于組提交, 我們希望之后再 flush,
			*/
			trx->flush_log_later = true;
		}
		// innobase commit, innodb層的提交
		innobase_commit_low(trx);
		// 非 read_only
		if (!read_only)
		{
			trx->flush_log_later = false;
			// 當前正在提交的執行緒 -1
			if (innobase_commit_concurrency > 0)
			{

				mysql_mutex_lock(&commit_cond_m);
				ut_ad(commit_threads > 0);
				--commit_threads;
				mysql_cond_signal(&commit_cond);

				mysql_mutex_unlock(&commit_cond_m);
			}
		}
		// trx提交完成, 取消注冊
		trx_deregister_from_2pc(trx);
		/* Now do a write + flush of logs. 
			現在 write/flush logs.
		*/
		if (!read_only)
		{
			// write/flush 指定 trx lsn前的redo log到磁盤中,
               // 這里會再次 flush redo log, trx_commit_complete_for_mysql(trx); } } else { /* 如果是非事務提交 僅僅需要釋放指定SQL陳述句對應的表上的 auto-inc 鎖, */ if (!read_only) { lock_unlock_table_autoinc(trx); } /* 存盤當前事務的 undo_no, 以便如果需要roll back下一個SQL陳述句時直到rollback到哪里, */ trx_mark_sql_stat_end(trx); } /* Reset the number AUTO-INC rows required */ trx->n_autoinc_rows = 0; /* This is a statement level variable. */ trx->fts_next_doc_id = 0; // 強制一個執行緒 leave innodb, 即使他有多余的 tickets. innobase_srv_conc_force_exit_innodb(trx); DBUG_RETURN(0); }

  

/**
  Process after commit for a sequence of sessions.
  處理 after_commit.
 */
void MYSQL_BIN_LOG::process_after_commit_stage_queue(THD *thd, THD *first)
{
  // 遍歷 commit 佇列中的執行緒
  for (THD *head = first; head; head = head->next_to_commit)
  {
    if (head->get_transaction()->m_flags.run_hooks &&
        head->commit_error != THD::CE_COMMIT_ERROR)
    {
      /*
              hook 可能會移動到 if 之外, 并且這可能是唯一的一處 after_commit 呼叫,
      */
      bool all = head->get_transaction()->m_flags.real_commit;
      (void)RUN_HOOK(transaction, after_commit, (head, all));
      head->get_transaction()->m_flags.run_hooks = false;
    }
  }
}
/* finish commit **/
int MYSQL_BIN_LOG::finish_commit(THD *thd)
{
  if (unlikely(!is_open()))
  {
    // 清空 binlog cache臨時檔案和記憶體
    binlog_cache_mngr *cache_mngr = thd_get_cache_mngr(thd);
    if (cache_mngr)
      cache_mngr->reset();
  }
  if (thd->get_transaction()->sequence_number != SEQ_UNINIT)
  {
    mysql_mutex_lock(&LOCK_slave_trans_dep_tracker);
    // 更新 max_commit
    m_dependency_tracker.update_max_committed(thd);
    mysql_mutex_unlock(&LOCK_slave_trans_dep_tracker);
  }
  if (thd->get_transaction()->m_flags.commit_low)
  {
    const bool all = thd->get_transaction()->m_flags.real_commit;
    /*
      這里, flush error 和 sync error 將被忽略
    */
    assert(thd->commit_error != THD::CE_COMMIT_ERROR);
    /*
      storage engine commit
      存盤引擎提交
    */
    if (ha_commit_low(thd, all, false))
      thd->commit_error = THD::CE_COMMIT_ERROR;
    if (thd->get_transaction()->m_flags.xid_written)
      dec_prep_xids(thd);
    /*
            after_commit hook
    */
    if ((thd->commit_error != THD::CE_COMMIT_ERROR) &&
        thd->get_transaction()->m_flags.run_hooks)
    {
      (void)RUN_HOOK(transaction, after_commit, (thd, all));
      thd->get_transaction()->m_flags.run_hooks = false;
    }
  }
  else if (thd->get_transaction()->m_flags.xid_written)
    dec_prep_xids(thd);

  if (!thd->owned_gtid.is_empty())
  {
    /*
      更新 gtid_executed
    */
    if (thd->commit_error == THD::CE_NONE)
    {
      gtid_state->update_on_commit(thd);
    }
    else
      gtid_state->update_on_rollback(thd);
  }

  

  

  

  

  

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/457566.html

標籤:MySQL

上一篇:帶有已取消令牌的Task.Factory.StartNew的狀態為running

下一篇:使用 LOAD DATA LOCAL INFILE,sysbench 導數速度提升30%

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:33:24 more
  • MySQL中binlog備份腳本分享

    關于MySQL的二進制日志(binlog),我們都知道二進制日志(binlog)非常重要,尤其當你需要point to point災難恢復的時侯,所以我們要對其進行備份。關于二進制日志(binlog)的備份,可以基于flush logs方式先切換binlog,然后拷貝&壓縮到到遠程服務器或本地服務器 ......

    uj5u.com 2023-04-20 08:28:06 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:27:27 more
  • 快取與資料庫雙寫一致性幾種策略分析

    本文將對幾種快取與資料庫保證資料一致性的使用方式進行分析。為保證高并發性能,以下分析場景不考慮執行的原子性及加鎖等強一致性要求的場景,僅追求最終一致性。 ......

    uj5u.com 2023-04-20 08:26:48 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:26:35 more
  • 云時代,MySQL到ClickHouse資料同步產品對比推薦

    ClickHouse 在執行分析查詢時的速度優勢很好的彌補了MySQL的不足,但是對于很多開發者和DBA來說,如何將MySQL穩定、高效、簡單的同步到 ClickHouse 卻很困難。本文對比了 NineData、MaterializeMySQL(ClickHouse自帶)、Bifrost 三款產品... ......

    uj5u.com 2023-04-20 08:26:29 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:25:13 more
  • Redis 報”OutOfDirectMemoryError“(堆外記憶體溢位)

    Redis 報錯“OutOfDirectMemoryError(堆外記憶體溢位) ”問題如下: 一、報錯資訊: 使用 Redis 的業務介面 ,產生 OutOfDirectMemoryError(堆外記憶體溢位),如圖: 格式化后的報錯資訊: { "timestamp": "2023-04-17 22: ......

    uj5u.com 2023-04-20 08:24:54 more
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:24:03 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:23:11 more