主頁 > 資料庫 > [Raft共識演算法] Dragonboat Log Replication 代碼走讀

[Raft共識演算法] Dragonboat Log Replication 代碼走讀

2022-10-12 08:42:08 資料庫

Dragonboat Log Replication 代碼走讀

Dragonboat 是一個開源的高性能Go實作的Raft共識協議實作. 具有良好的性能和久經社區檢驗的魯棒性, 機遇巧合, 接觸到. 因此決定結合Raft博士論文走讀其原始碼. 今天帶來Raft中三大核心之一的日志復制Log Replication的代碼走讀.

Dragonboat Log Replication代碼實作結構

![Dragonboat log replication](/Users/tanghangyun/Documents/Dragonboat log replication.png)

Dragonboat中的網路介面呼叫主要在node.go檔案中實作, 作者提供了對網路介面的抽象, 可以自由實作底層的網路互動方法. 本次討論僅涉及對這些網路介面的代用邏輯, 也就是作業流的講解, 不涉及網路協議底層實作的邏輯討論. 作者在protobuf中定義了msg.Tpye, 并通過路由函式將不同Type的msg路由到不同的Handler函式進行處理.

msg Type 及其路由處理函式解讀

先介紹根據msg.Type 進行路由的路由函式

路由函式 initializeHandlerMap

func (r *raft) Handle(m pb.Message) {
	if !r.onMessageTermNotMatched(m) {
		r.doubleCheckTermMatched(m.Term)
        r.handle(r, m)
    } ...
}

func (r *raft) initializeHandlerMap() {
	// candidate
...
	// follower
	r.handlers[follower][pb.Propose] = r.handleFollowerPropose
	r.handlers[follower][pb.Replicate] = r.handleFollowerReplicate
	r.handlers[follower][pb.Heartbeat] = r.handleFollowerHeartbeat
	r.handlers[follower][pb.ReadIndex] = r.handleFollowerReadIndex
	r.handlers[follower][pb.LeaderTransfer] = r.handleFollowerLeaderTransfer
	r.handlers[follower][pb.ReadIndexResp] = r.handleFollowerReadIndexResp
	r.handlers[follower][pb.InstallSnapshot] = r.handleFollowerInstallSnapshot
	r.handlers[follower][pb.Election] = r.handleNodeElection
	r.handlers[follower][pb.RequestVote] = r.handleNodeRequestVote
	r.handlers[follower][pb.TimeoutNow] = r.handleFollowerTimeoutNow
	r.handlers[follower][pb.ConfigChangeEvent] = r.handleNodeConfigChange
	r.handlers[follower][pb.LocalTick] = r.handleLocalTick
	r.handlers[follower][pb.SnapshotReceived] = r.handleRestoreRemote
	// leader
	r.handlers[leader][pb.LeaderHeartbeat] = r.handleLeaderHeartbeat
	r.handlers[leader][pb.CheckQuorum] = r.handleLeaderCheckQuorum
	r.handlers[leader][pb.Propose] = r.handleLeaderPropose
	r.handlers[leader][pb.ReadIndex] = r.handleLeaderReadIndex
	r.handlers[leader][pb.ReplicateResp] = lw(r, r.handleLeaderReplicateResp)
	r.handlers[leader][pb.HeartbeatResp] = lw(r, r.handleLeaderHeartbeatResp)
	r.handlers[leader][pb.SnapshotStatus] = lw(r, r.handleLeaderSnapshotStatus)
	r.handlers[leader][pb.Unreachable] = lw(r, r.handleLeaderUnreachable)
	r.handlers[leader][pb.LeaderTransfer] = r.handleLeaderTransfer
	r.handlers[leader][pb.Election] = r.handleNodeElection
	r.handlers[leader][pb.RequestVote] = r.handleNodeRequestVote
	r.handlers[leader][pb.ConfigChangeEvent] = r.handleNodeConfigChange
	r.handlers[leader][pb.LocalTick] = r.handleLocalTick
	r.handlers[leader][pb.SnapshotReceived] = r.handleRestoreRemote
	r.handlers[leader][pb.RateLimit] = r.handleLeaderRateLimit
	// observer
...
	// witness
...
}

重點需要關注的函式是 r.handlers[follower][pb.Propose] = r.handleFollowerPropose, r.handlers[follower][pb.Replicate] = r.handleFollowerReplicate, r.handlers[leader][pb.Propose] = r.handleLeaderPropose, r.handlers[leader][pb.ReplicateResp] = lw(r, r.handleLeaderReplicateResp)這四個函式. 分別對應Follower處理Proposal訊息和Replicate訊息; 以及Leader處理ProposalS和ReplicateResp訊息. 接下來分別閱讀上述四個函式. 以及上述四個函式后續的呼叫堆疊. 最終在本地呼叫堆疊結束于send函式. send函式十分簡單僅僅將msgs添加到r.msgs領域中. 之后將有node掃描raft的msgs領域和logs領域中的快取訊息, 并發起網路互動.

send

func (r *raft) send(m pb.Message) {
   m.From = r.nodeID
   m = r.finalizeMessageTerm(m)
   r.msgs = append(r.msgs, m)
}

更新msg的任期以及原節點id資訊, 后添加到raft的msgs領域.

handleFollowerPropose

func (r *raft) handleFollowerPropose(m pb.Message) {
	if r.leaderID == NoLeader {
		plog.Warningf("%s dropped proposal, no leader", r.describe())
		r.reportDroppedProposal(m)
		return
	}
	m.To = r.leaderID
	// the message might be queued by the transport layer, this violates the
	// requirement of the entryQueue.get() func. copy the m.Entries to its
	// own space.
	m.Entries = newEntrySlice(m.Entries)
	r.send(m)
}

Follower接到客戶端的proposal(提議) 后需要將提議轉發給主節點, 因此更新完msg.To 目的節點資訊后立刻轉發. 呼叫send函式.

handleLeaderPropose 及其后續函式

func (r *raft) handleLeaderPropose(m pb.Message) {
	r.mustBeLeader()
	if r.leaderTransfering() {
		plog.Warningf("%s dropped proposal, leader transferring", r.describe())
		r.reportDroppedProposal(m)
		return
	}
	for i, e := range m.Entries {
		if e.Type == pb.ConfigChangeEntry {
			if r.hasPendingConfigChange() {
				plog.Warningf("%s dropped config change, pending change", r.describe())
				r.reportDroppedConfigChange(m.Entries[i])
				m.Entries[i] = pb.Entry{Type: pb.ApplicationEntry}
			}
			r.setPendingConfigChange()
		}
	}
	r.appendEntries(m.Entries)
	r.broadcastReplicateMessage()
}

前18行代碼都不是我們關注的重點: 大體進行一下在確認主節點完畢之后, 判斷當前集群狀態, 以及配置變更的操作. 最后兩行的帶嗎引起我的的注意. 他們分別是 r.appendEntries(m.Entries) r.broadcastReplicateMessage().

func (r *raft) appendEntries(entries []pb.Entry) {
	lastIndex := r.log.lastIndex()
	for i := range entries {
		entries[i].Term = r.term
		entries[i].Index = lastIndex + 1 + uint64(i)
	}
	r.log.append(entries)
	r.remotes[r.nodeID].tryUpdate(r.log.lastIndex())
	if r.isSingleNodeQuorum() {
		r.tryCommit()
	}
}

在appendEntries中更新每個entry的term和Index資訊. 并將這些entries添加到r.log中.

func (r *raft) broadcastReplicateMessage() {
	r.mustBeLeader()
	for nid := range r.observers {
		if nid == r.nodeID {
			plog.Panicf("%s observer is broadcasting Replicate msg", r.describe())
		}
	}
	for _, nid := range r.nodes() {
		if nid != r.nodeID {
			r.sendReplicateMessage(nid)
		}
	}
}

broadcastReplicateMessage方法中, 檢查完leader之后, 呼叫r.sendReplicateMessage(nid)來實作訊息的發送.

func (r *raft) sendReplicateMessage(to uint64) {
	var rp *remote
	if v, ok := r.remotes[to]; ok {
		rp = v
	} else if v, ok := r.observers[to]; ok {
		rp = v
	} else {
		rp, ok = r.witnesses[to]
		if !ok {
			plog.Panicf("%s failed to get the remote instance", r.describe())
		}
	}
	if rp.isPaused() {
		return
	}
	m, err := r.makeReplicateMessage(to, rp.next, maxEntrySize)
	if err != nil {
		// log not available due to compaction, send snapshot
		if !rp.isActive() {
			plog.Warningf("%s, %s is not active, sending snapshot is skipped",
				r.describe(), NodeID(to))
			return
		}
		index := r.makeInstallSnapshotMessage(to, &m)
		plog.Infof("%s is sending snapshot (%d) to %s, r.Next %d, r.Match %d, %v",
			r.describe(), index, NodeID(to), rp.next, rp.match, err)
		rp.becomeSnapshot(index)
	} else if len(m.Entries) > 0 {
		lastIndex := m.Entries[len(m.Entries)-1].Index
		rp.progress(lastIndex)
	}
	r.send(m)
}

該訊息發送函式進行了一系列狀態檢查和判斷之后, 最后一行陳述句點明主旨. 還是呼叫本段開始所述的send方法.

handleFollowerReplicate

func (r *raft) handleFollowerReplicate(m pb.Message) {
	r.leaderIsAvailable()
	r.setLeaderID(m.From)
	r.handleReplicateMessage(m)
}

前兩行判斷leader的資訊. 最后一行呼叫r.handleReplicateMessage(m)方法處理Replicate資訊.

在處理Replicate msg的程序中, 根據comitted資訊的不同將有兩種邏輯, 分別對應日志的復制和日志的提交.

func (r *raft) handleReplicateMessage(m pb.Message) {
	resp := pb.Message{
		To:   m.From,
		Type: pb.ReplicateResp,
	}
	if m.LogIndex < r.log.committed {
		resp.LogIndex = r.log.committed
		r.send(resp)
		return
	}
	if r.log.matchTerm(m.LogIndex, m.LogTerm) {
		r.log.tryAppend(m.LogIndex, m.Entries)
		lastIdx := m.LogIndex + uint64(len(m.Entries))
		r.log.commitTo(min(lastIdx, m.Commit))
		resp.LogIndex = lastIdx
	} else {
		plog.Debugf("%s rejected Replicate index %d term %d from %s",
			r.describe(), m.LogIndex, m.Term, NodeID(m.From))
		resp.Reject = true
		resp.LogIndex = m.LogIndex
		resp.Hint = r.log.lastIndex()
		if r.events != nil {
			info := server.ReplicationInfo{
				ClusterID: r.clusterID,
				NodeID:    r.nodeID,
				Index:     m.LogIndex,
				Term:      m.LogTerm,
				From:      m.From,
			}
			r.events.ReplicationRejected(info)
		}
	}
	r.send(resp)
}

func (l *entryLog) tryAppend(index uint64, ents []pb.Entry) bool {
	conflictIndex := l.getConflictIndex(ents)
	if conflictIndex != 0 {
		if conflictIndex <= l.committed {
			plog.Panicf("entry %d conflicts with committed entry, committed %d",
				conflictIndex, l.committed)
		}
		l.append(ents[conflictIndex-index-1:])
		return true
	}
	return false
}

func (l *entryLog) getConflictIndex(entries []pb.Entry) uint64 {
	for _, e := range entries {
		if !l.matchTerm(e.Index, e.Term) {
			return e.Index
		}
	}
	return 0
}

func (l *entryLog) commitTo(index uint64) {
	if index <= l.committed {
		return
	}
	if index > l.lastIndex() {
		plog.Panicf("invalid commitTo index %d, lastIndex() %d",
			index, l.lastIndex())
	}
	l.committed = index
}

func (l *entryLog) lastIndex() uint64 {
	index, ok := l.inmem.getLastIndex()
	if ok {
		return index
	}

	_, index = l.logdb.GetRange()
	return index
}

前五行構造了replicaresp資料結構的同時, 對當前的committedIndex和m.LogIndex進行對比, 顯然拒絕了比當前已提交的Index更小的訊息. 之后在11--15行的代碼中, 進行了term任期校驗后, 添加msg到r.log中, 更新其committed的index值. 一切結束之后使用前述的send方法回傳Resp.

handleLeaderReplicateResp

func (r *raft) handleLeaderReplicateResp(m pb.Message, rp *remote) {
	r.mustBeLeader()
	rp.setActive()
	if !m.Reject {
		paused := rp.isPaused()
		if rp.tryUpdate(m.LogIndex) {
			rp.respondedTo()
			if r.tryCommit() {
				r.broadcastReplicateMessage()
			} else if paused {
				r.sendReplicateMessage(m.From)
			}
			// according to the leadership transfer protocol listed on the p29 of the
			// raft thesis
			if r.leaderTransfering() && m.From == r.leaderTransferTarget &&
				r.log.lastIndex() == rp.match {
				r.sendTimeoutNowMessage(r.leaderTransferTarget)
			}
		}
	} else {
		// the replication flow control code is derived from etcd raft, it resets
		// nextIndex to match + 1. it is thus even more conservative than the raft
		// thesis's approach of nextIndex = nextIndex - 1 mentioned on the p21 of
		// the thesis.
		if rp.decreaseTo(m.LogIndex, m.Hint) {
			r.enterRetryState(rp)
			r.sendReplicateMessage(m.From)
		}
	}
}

不考慮失敗的其他情況, 重點關注5--19行的代碼, 不難發現, r.tryCommit() 和``r.broadcastReplicateMessage()`是值得重點注意的. 其中第一個函式負責狀態判斷, 第二個函式負責訊息的廣播.

func (r *raft) tryCommit() bool {
	r.mustBeLeader()
	if r.numVotingMembers() != len(r.matched) {
		r.resetMatchValueArray()
	}
	idx := 0
	for _, v := range r.remotes {
		r.matched[idx] = v.match
		idx++
	}
	for _, v := range r.witnesses {
		r.matched[idx] = v.match
		idx++
	}
	r.sortMatchValues()
	q := r.matched[r.numVotingMembers()-r.quorum()]
	// see p8 raft paper
	// "Raft never commits log entries from previous terms by counting replicas.
	// Only log entries from the leader’s current term are committed by counting
	// replicas"
	return r.log.tryCommit(q, r.term)
}

判斷完leader身份之后進行?? 此處存疑. 之后到entryLog進行commit操作. 對于leader來說已經完成了日志提交的程序了, 但是client還需要對leader的本次Replicate資訊進行反饋.

func (l *entryLog) tryCommit(index uint64, term uint64) bool {
	if index <= l.committed {
		return false
	}
	lterm, err := l.term(index)
	if err == ErrCompacted {
		lterm = 0
	} else if err != nil {
		panic(err)
	}
	if index > l.committed && lterm == term {
		l.commitTo(index)
		return true
	}
	return false
}

具體的commit邏輯還是在entrylog的方法中實作的.

func (r *raft) broadcastReplicateMessage() {
	r.mustBeLeader()
	for nid := range r.observers {
		if nid == r.nodeID {
			plog.Panicf("%s observer is broadcasting Replicate msg", r.describe())
		}
	}
	for _, nid := range r.nodes() {
		if nid != r.nodeID {
			plog.Errorf("[Aibot] %s is sending replicate message to %s", r.describe(), NodeID(nid))
			r.sendReplicateMessage(nid)
		}
	}
}

判斷完狀態最后一行進行訊息的發送

func (r *raft) sendReplicateMessage(to uint64) {
	var rp *remote
	if v, ok := r.remotes[to]; ok {
		rp = v
	} else if v, ok := r.observers[to]; ok {
		rp = v
	} else {
		rp, ok = r.witnesses[to]
		if !ok {
			plog.Panicf("%s failed to get the remote instance", r.describe())
		}
	}
	if rp.isPaused() {
		return
	}
	m, err := r.makeReplicateMessage(to, rp.next, maxEntrySize)
	if err != nil {
		// log not available due to compaction, send snapshot
		if !rp.isActive() {
			plog.Warningf("%s, %s is not active, sending snapshot is skipped",
				r.describe(), NodeID(to))
			return
		}
		index := r.makeInstallSnapshotMessage(to, &m)
		plog.Infof("%s is sending snapshot (%d) to %s, r.Next %d, r.Match %d, %v",
			r.describe(), index, NodeID(to), rp.next, rp.match, err)
		rp.becomeSnapshot(index)
	} else if len(m.Entries) > 0 {
		lastIndex := m.Entries[len(m.Entries)-1].Index
		rp.progress(lastIndex)
	}
	r.send(m)
}

從第16行開始構造一個replicate Message開始, 這里的pregress方法提供對遠程狀態的管理.

func (r *raft) makeReplicateMessage(to uint64,
	next uint64, maxSize uint64) (pb.Message, error) {
	term, err := r.log.term(next - 1)
	if err != nil {
		return pb.Message{}, err
	}
	entries, err := r.log.entries(next, maxSize)
	if err != nil {
		return pb.Message{}, err
	}
	if len(entries) > 0 {
		lastIndex := entries[len(entries)-1].Index
		expected := next - 1 + uint64(len(entries))
		if lastIndex != expected {
			plog.Panicf("%s expected last index in Replicate %d, got %d",
				r.describe(), expected, lastIndex)
		}
	}
	// Don't send actual log entry to witness as they won't replicate real message,
	// unless there is a config change.
	if _, ok := r.witnesses[to]; ok {
		entries = makeMetadataEntries(entries)
	}
	return pb.Message{
		To:       to,
		Type:     pb.Replicate,
		LogIndex: next - 1,
		LogTerm:  term,
		Entries:  entries,
		Commit:   r.log.committed,
	}, nil
}

構建Replicate, msg. 之后發送給follower.

func (r *remote) progress(lastIndex uint64) {
   if r.state == remoteReplicate {
      r.next = lastIndex + 1
   } else if r.state == remoteRetry {
      r.retryToWait()
   } else {
      panic("unexpected remote state")
   }
}

node的互動邏輯

主行程中有一個while True回圈進行實時變更的處理.

func (e *engine) stepWorkerMain(workerID uint64) {
	nodes := make(map[uint64]*node)
	ticker := time.NewTicker(nodeReloadInterval)
	defer ticker.Stop()
	cci := uint64(0)
	stopC := e.nodeStopper.ShouldStop()
	updates := make([]pb.Update, 0)
	for {
		select {
		case <-stopC:
			e.offloadNodeMap(nodes)
			return
		case <-ticker.C:
			nodes, cci = e.loadStepNodes(workerID, cci, nodes)
			e.processSteps(workerID, make(map[uint64]struct{}), nodes, updates, stopC)
		case <-e.stepCCIReady.waitCh(workerID):
			nodes, cci = e.loadStepNodes(workerID, cci, nodes)
		case <-e.stepWorkReady.waitCh(workerID):
			if cci == 0 || len(nodes) == 0 {
				nodes, cci = e.loadStepNodes(workerID, cci, nodes)
			}
			active := e.stepWorkReady.getReadyMap(workerID)
			e.processSteps(workerID, active, nodes, updates, stopC)
		}
	}
}

在這個回圈中的第23行e.processSteps(workerID, active, nodes, updates, stopC)監控事件的狀態并進行處理

func (e *engine) processSteps(workerID uint64,
	active map[uint64]struct{},
	nodes map[uint64]*node, nodeUpdates []pb.Update, stopC chan struct{}) {
	if len(nodes) == 0 {
		return
	}
	if len(active) == 0 {
		for cid := range nodes {
			active[cid] = struct{}{}
		}
	}
	nodeUpdates = nodeUpdates[:0]
	for cid := range active {
		node, ok := nodes[cid]
		if !ok || node.stopped() {
			continue
		}
		if ud, hasUpdate := node.stepNode(); hasUpdate {
			nodeUpdates = append(nodeUpdates, ud)
		}
	}
	e.applySnapshotAndUpdate(nodeUpdates, nodes, true)
	// see raft thesis section 10.2.1 on details why we send Replicate message
	// before those entries are persisted to disk
	for _, ud := range nodeUpdates {
		node := nodes[ud.ClusterID]
		node.sendReplicateMessages(ud)
		node.processReadyToRead(ud)
		node.processDroppedEntries(ud)
		node.processDroppedReadIndexes(ud)
	}
	if err := e.logdb.SaveRaftState(nodeUpdates, workerID); err != nil {
		panic(err)
	}
	if err := e.onSnapshotSaved(nodeUpdates, nodes); err != nil {
		panic(err)
	}
	e.applySnapshotAndUpdate(nodeUpdates, nodes, false)
	for _, ud := range nodeUpdates {
		node := nodes[ud.ClusterID]
		if err := node.processRaftUpdate(ud); err != nil {
			panic(err)
		}
		e.processMoreCommittedEntries(ud)
		node.commitRaftUpdate(ud)
	}
	if lazyFreeCycle > 0 {
		resetNodeUpdate(nodeUpdates)
	}
}

在這個方法中第18行stepNode方法負責進行Node本地事務的處理包括本地客戶端以及其他節點發送到本機的訊息. 第41行負責進行網路互動processRaftUpdate

func (n *node) processRaftUpdate(ud pb.Update) error {
	if err := n.logReader.Append(ud.EntriesToSave); err != nil {
		return err
	}
	n.sendMessages(ud.Messages)
	if err := n.removeLog(); err != nil {
		return err
	}
	if err := n.runSyncTask(); err != nil {
		return err
	}
	if n.saveSnapshotRequired(ud.LastApplied) {
		n.pushTakeSnapshotRequest(rsm.SSRequest{})
	}
	return nil
}

第5行 n.sendMessages(ud.Messages)方法

func (n *node) sendMessages(msgs []pb.Message) {
	for _, msg := range msgs {
		if !isFreeOrderMessage(msg) {
			msg.ClusterId = n.clusterID
			n.sendRaftMessage(msg)
		}
	}
}

第5行n.sendRaftMessage(msg)由上層函式指定方法

func (nh *NodeHost) sendMessage(msg pb.Message) {
	if nh.isPartitioned() {
		return
	}
	if msg.Type != pb.InstallSnapshot {
		nh.transport.Send(msg)
	} else {
		witness := msg.Snapshot.Witness
		plog.Debugf("%s is sending snapshot to %s, witness %t, index %d, size %d",
			dn(msg.ClusterId, msg.From), dn(msg.ClusterId, msg.To),
			witness, msg.Snapshot.Index, msg.Snapshot.FileSize)
		if n, ok := nh.getCluster(msg.ClusterId); ok {
			if witness || !n.OnDiskStateMachine() {
				nh.transport.SendSnapshot(msg)
			} else {
				n.pushStreamSnapshotRequest(msg.ClusterId, msg.To)
			}
		}
		nh.events.sys.Publish(server.SystemEvent{
			Type:      server.SendSnapshotStarted,
			ClusterID: msg.ClusterId,
			NodeID:    msg.To,
			From:      msg.From,
		})
	}
}s

第6行nh.transport.Send(msg)

// Send asynchronously sends raft messages to their target nodes.
//
// The generic async send Go pattern used in Send() is found in CockroachDB's
// codebase.
func (t *Transport) Send(req pb.Message) bool {
	v, _ := t.send(req)
	if !v {
		t.metrics.messageSendFailure(1)
	}
	return v
}

Raft日志復制程序詳解

日志復制

日志提交

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/513726.html

標籤:其它

上一篇:day10-習題

下一篇:StoneDB主從切換實踐方案

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:33:24 more
  • MySQL中binlog備份腳本分享

    關于MySQL的二進制日志(binlog),我們都知道二進制日志(binlog)非常重要,尤其當你需要point to point災難恢復的時侯,所以我們要對其進行備份。關于二進制日志(binlog)的備份,可以基于flush logs方式先切換binlog,然后拷貝&壓縮到到遠程服務器或本地服務器 ......

    uj5u.com 2023-04-20 08:28:06 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:27:27 more
  • 快取與資料庫雙寫一致性幾種策略分析

    本文將對幾種快取與資料庫保證資料一致性的使用方式進行分析。為保證高并發性能,以下分析場景不考慮執行的原子性及加鎖等強一致性要求的場景,僅追求最終一致性。 ......

    uj5u.com 2023-04-20 08:26:48 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:26:35 more
  • 云時代,MySQL到ClickHouse資料同步產品對比推薦

    ClickHouse 在執行分析查詢時的速度優勢很好的彌補了MySQL的不足,但是對于很多開發者和DBA來說,如何將MySQL穩定、高效、簡單的同步到 ClickHouse 卻很困難。本文對比了 NineData、MaterializeMySQL(ClickHouse自帶)、Bifrost 三款產品... ......

    uj5u.com 2023-04-20 08:26:29 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:25:13 more
  • Redis 報”OutOfDirectMemoryError“(堆外記憶體溢位)

    Redis 報錯“OutOfDirectMemoryError(堆外記憶體溢位) ”問題如下: 一、報錯資訊: 使用 Redis 的業務介面 ,產生 OutOfDirectMemoryError(堆外記憶體溢位),如圖: 格式化后的報錯資訊: { "timestamp": "2023-04-17 22: ......

    uj5u.com 2023-04-20 08:24:54 more
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:24:03 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:23:11 more