fabric1.0學習筆記(1)
fabric1.0學習筆記(2)
fabric1.0學習筆記(3)
fabric1.0學習筆記(4)
fabric1.0學習筆記(5)
本次接學習筆記(4),深入了解賬本存盤的內容
分為四個主要部分
- 賬本存盤介面定義
- 交易讀寫集
- 狀態資料庫及歷史狀態資料庫
- 區塊檔案存盤及區塊索引
一、賬本存盤中的主要介面定義
fabric為賬本存盤相關的方法封裝了6個介面
分別是
- PeerLedgerProvider對賬本實體操作的方法介面
- PeerLedger賬本中的方法介面包括賬本物件及獲取交易、獲取區塊等方法
- ValidatedLedger有效賬本,也就是賬本檔案存盤介面,只有賬本物件,沒有其他方法
- QueryExecutor賬本查詢方法介面如獲取世界狀態等
- HistoryQueryExecutor歷史查詢方法介面只有一個獲取key的變動歷史
- TxSimulator交易模擬方法介面,是交易執行的背景關系,包括了前面的QueryExecutor、一些對世界狀態的操作,以及一個回傳交易模擬執行的結果即匯出交易讀寫集
相關代碼在ledger路徑下
ledger_interface.go
// PeerLedgerProvider provides handle to ledger instances
type PeerLedgerProvider interface {
// Create creates a new ledger with the given genesis block.
// This function guarantees that the creation of ledger and committing the genesis block would an atomic action
// The chain id retrieved from the genesis block is treated as a ledger id
//根據創世區塊創建一個新的賬本,創世區塊中的通道id作為賬本id
Create(genesisBlock *common.Block) (PeerLedger, error)
// Open opens an already created ledger
//根據賬本id打開一個已經存在的賬本
Open(ledgerID string) (PeerLedger, error)
// Exists tells whether the ledger with given id exists
//判斷某個賬本id對應的賬本是否存在
Exists(ledgerID string) (bool, error)
// List lists the ids of the existing ledgers
//列出所有的賬本id
List() ([]string, error)
// Close closes the PeerLedgerProvider
//關閉介面?
Close()
}
// PeerLedger differs from the OrdererLedger in that PeerLedger locally maintain a bitmask
that tells apart valid transactions from invalid ones
//peer節點的賬本和orderer節點的賬本有所區別,peer會檢驗交易是否有效
//peerledgerprovider介面創建的賬本都是創建的該物件
type PeerLedger interface {
//賬本的檔案存盤
commonledger.Ledger
// GetTransactionByID retrieves a transaction by id
//根據交易id回傳對應的交易
GetTransactionByID(txID string) (*peer.ProcessedTransaction, error)
// GetBlockByHash returns a block given it's hash
//根據區塊哈希回傳對應的區塊
GetBlockByHash(blockHash []byte) (*common.Block, error)
// GetBlockByTxID returns a block which contains a transaction
//根據交易id回傳交易所在的區塊
GetBlockByTxID(txID string) (*common.Block, error)
// GetTxValidationCodeByTxID returns reason code of transaction validation
//根據交易id回傳對應交易的驗證碼(狀態,有效、無效)
GetTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error)
//以上四個是賬本的索引
// NewTxSimulator gives handle to a transaction simulator.
// A client can obtain more than one 'TxSimulator's for parallel execution.
// Any snapshoting/synchronization should be performed at the implementation level if required
//生成新的交易模擬器,可以理解為交易的背景關系,封裝了一些對世界狀態的操作
NewTxSimulator() (TxSimulator, error)
// NewQueryExecutor gives handle to a query executor.
// A client can obtain more than one 'QueryExecutor's for parallel execution.
// Any synchronization should be performed at the implementation level if required
//新的查詢執行器,查詢狀態資料庫
NewQueryExecutor() (QueryExecutor, error)
// NewHistoryQueryExecutor gives handle to a history query executor.
// A client can obtain more than one 'HistoryQueryExecutor's for parallel execution.
// Any synchronization should be performed at the implementation level if required
//新的歷史查詢器,查詢歷史資料庫
NewHistoryQueryExecutor() (HistoryQueryExecutor, error)
//Prune prunes the blocks/transactions that satisfy the given policy
//根據策略對區塊或交易進行裁剪,1.0是一個空方法
Prune(policy commonledger.PrunePolicy) error
}
// ValidatedLedger represents the 'final ledger' after filtering out invalid transactions from PeerLedger.
// Post-v1
//經檢驗有效的賬本,即最終賬本,指代賬本檔案存盤
type ValidatedLedger interface {
commonledger.Ledger
}
// QueryExecutor executes the queries
// Get* methods are for supporting KV-based data model. ExecuteQuery method is for supporting a rich datamodel and query support
//
// ExecuteQuery method in the case of a rich data model is expected to support queries on
// latest state, historical state and on the intersection of state and transactions
//賬本查詢
type QueryExecutor interface {
// GetState gets the value for given namespace and key. For a chaincode, the namespace corresponds to the chaincodeId
//獲取一個狀態,namespace可以理解為賬本id和鏈碼名的組合,key是賬本中的物件
GetState(namespace string, key string) ([]byte, error)
// GetStateMultipleKeys gets the values for multiple keys in a single call
//獲取多個狀態
GetStateMultipleKeys(namespace string, keys []string) ([][]byte, error)
// GetStateRangeScanIterator returns an iterator that contains all the key-values between given key ranges.
// startKey is included in the results and endKey is excluded. An empty startKey refers to the first available key
// and an empty endKey refers to the last available key. For scanning all the keys, both the startKey and the endKey
// can be supplied as empty strings. However, a full scan shuold be used judiciously for performance reasons.
// The returned ResultsIterator contains results of type *KV which is defined in protos/ledger/queryresult.
//獲取狀態區間
GetStateRangeScanIterator(namespace string, startKey string, endKey string) (commonledger.ResultsIterator, error)
//上面方法都是對鍵值對模型的操作
// ExecuteQuery executes the given query and returns an iterator that contains results of type specific to the underlying data store.
// Only used for state databases that support query
// For a chaincode, the namespace corresponds to the chaincodeId
// The returned ResultsIterator contains results of type *KV which is defined in protos/ledger/queryresult.
//富文本查詢/模糊查詢,只有couchdb支持,leveldb不支持
ExecuteQuery(namespace, query string) (commonledger.ResultsIterator, error)
// Done releases resources occupied by the QueryExecutor
Done()
}
// HistoryQueryExecutor executes the history queries
type HistoryQueryExecutor interface {
// GetHistoryForKey retrieves the history of values for a key.
// The returned ResultsIterator contains results of type *KeyModification which is defined in protos/ledger/queryresult.
//根據key獲取key的變動歷史
GetHistoryForKey(namespace string, key string) (commonledger.ResultsIterator, error)
}
// TxSimulator simulates a transaction on a consistent snapshot of the 'as recent state as possible'
// Set* methods are for supporting KV-based data model. ExecuteUpdate method is for supporting a rich datamodel and query support
//交易執行的背景關系
type TxSimulator interface {
QueryExecutor//上面的查詢執行器
// SetState sets the given value for the given namespace and key. For a chaincode, the namespace corresponds to the chaincodeId
SetState(namespace string, key string, value []byte) error
// DeleteState deletes the given namespace and key
DeleteState(namespace string, key string) error
// SetMultipleKeys sets the values for multiple keys in a single call
SetStateMultipleKeys(namespace string, kvs map[string][]byte) error
// ExecuteUpdate for supporting rich data model (see comments on QueryExecutor above)
//以上三個方法對state即世界狀態進行更改
ExecuteUpdate(query string) error
// GetTxSimulationResults encapsulates the results of the transaction simulation.
// This should contain enough detail for
// - The update in the state that would be caused if the transaction is to be committed
// - The environment in which the transaction is executed so as to be able to decide the validity of the environment
// (at a later time on a different peer) during committing the transactions
// Different ledger implementation (or configurations of a single implementation) may want to represent the above two pieces
// of information in different way in order to support different data-models or optimize the information representations.
//獲取交易模擬執行的結果,匯出交易讀寫集
GetTxSimulationResults() ([]byte, error)
}
接下來是構建讀寫集
lockbased_tx_simulator.go
// LockBasedTxSimulator is a transaction simulator used in `LockBasedTxMgr`
type lockBasedTxSimulator struct {
lockBasedQueryExecutor
rwsetBuilder *rwsetutil.RWSetBuilder //用于構建讀寫集
}
主要實作還是在RWSetBuilder中,進入RWSetBuilder看一下,主要包括的是對讀寫集的添加和匯出操作
rwset_builder.go
// RWSetBuilder helps building the read-write set
type RWSetBuilder struct {
rwMap map[string]*nsRWs //按照namespace隔離的map
}
// NewRWSetBuilder constructs a new instance of RWSetBuilder
func NewRWSetBuilder() *RWSetBuilder {
return &RWSetBuilder{make(map[string]*nsRWs)}
}
// AddToReadSet adds a key and corresponding version to the read-set
//向讀集里添加鍵值對
func (rws *RWSetBuilder) AddToReadSet(ns string, key string, version *version.Height) {
nsRWs := rws.getOrCreateNsRW(ns)
nsRWs.readMap[key] = NewKVRead(key, version)
}
// AddToWriteSet adds a key and value to the write-set
//向寫集里添加鍵值對
func (rws *RWSetBuilder) AddToWriteSet(ns string, key string, value []byte) {
nsRWs := rws.getOrCreateNsRW(ns)
nsRWs.writeMap[key] = newKVWrite(key, value)
}
// AddToRangeQuerySet adds a range query info for performing phantom read validation
func (rws *RWSetBuilder) AddToRangeQuerySet(ns string, rqi *kvrwset.RangeQueryInfo) {
nsRWs := rws.getOrCreateNsRW(ns)
key := rangeQueryKey{rqi.StartKey, rqi.EndKey, rqi.ItrExhausted}
_, ok := nsRWs.rangeQueriesMap[key]
if !ok {
nsRWs.rangeQueriesMap[key] = rqi
nsRWs.rangeQueriesKeys = append(nsRWs.rangeQueriesKeys, key)
}
}
// GetTxReadWriteSet returns the read-write set in the form that can be serialized
//匯出builder中生成的讀寫集
func (rws *RWSetBuilder) GetTxReadWriteSet() *TxRwSet {
txRWSet := &TxRwSet{}
sortedNamespaces := util.GetSortedKeys(rws.rwMap)
for _, ns := range sortedNamespaces {
//Get namespace specific read-writes
nsReadWriteMap := rws.rwMap[ns]
//add read set
var reads []*kvrwset.KVRead
sortedReadKeys := util.GetSortedKeys(nsReadWriteMap.readMap)
for _, key := range sortedReadKeys {
reads = append(reads, nsReadWriteMap.readMap[key])
}
//add write set
var writes []*kvrwset.KVWrite
sortedWriteKeys := util.GetSortedKeys(nsReadWriteMap.writeMap)
for _, key := range sortedWriteKeys {
writes = append(writes, nsReadWriteMap.writeMap[key])
}
//add range query info
var rangeQueriesInfo []*kvrwset.RangeQueryInfo
rangeQueriesMap := nsReadWriteMap.rangeQueriesMap
for _, key := range nsReadWriteMap.rangeQueriesKeys {
rangeQueriesInfo = append(rangeQueriesInfo, rangeQueriesMap[key])
}
kvRWs := &kvrwset.KVRWSet{Reads: reads, Writes: writes, RangeQueriesInfo: rangeQueriesInfo}
nsRWs := &NsRwSet{ns, kvRWs}
txRWSet.NsRwSets = append(txRWSet.NsRwSets, nsRWs)
}
return txRWSet
}
二、交易讀寫集
交易讀寫集就是告訴區塊鏈交易對哪些資料進行了讀操作,對哪些資料進行了寫操作或更新操作 包括讀集、寫集和版本號
- 構建讀寫集
相關代碼在ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr路徑下
lockbased_tx_simulator.go
// LockBasedTxSimulator is a transaction simulator used in `LockBasedTxMgr`
type lockBasedTxSimulator struct {
lockBasedQueryExecutor
rwsetBuilder *rwsetutil.RWSetBuilder //用于構建讀寫集
}
主要實作還是在RWSetBuilder中,進入RWSetBuilder看一下,主要包括的是對讀寫集的添加和匯出操作
rwset_builder.go
// RWSetBuilder helps building the read-write set
type RWSetBuilder struct {
rwMap map[string]*nsRWs //按照namespace隔離的map
}
// NewRWSetBuilder constructs a new instance of RWSetBuilder
func NewRWSetBuilder() *RWSetBuilder {
return &RWSetBuilder{make(map[string]*nsRWs)}
}
// AddToReadSet adds a key and corresponding version to the read-set
//向讀集里添加鍵值對
func (rws *RWSetBuilder) AddToReadSet(ns string, key string, version *version.Height) {
nsRWs := rws.getOrCreateNsRW(ns)
nsRWs.readMap[key] = NewKVRead(key, version)
}
// AddToWriteSet adds a key and value to the write-set
//向寫集里添加鍵值對
func (rws *RWSetBuilder) AddToWriteSet(ns string, key string, value []byte) {
nsRWs := rws.getOrCreateNsRW(ns)
nsRWs.writeMap[key] = newKVWrite(key, value)
}
// AddToRangeQuerySet adds a range query info for performing phantom read validation
func (rws *RWSetBuilder) AddToRangeQuerySet(ns string, rqi *kvrwset.RangeQueryInfo) {
nsRWs := rws.getOrCreateNsRW(ns)
key := rangeQueryKey{rqi.StartKey, rqi.EndKey, rqi.ItrExhausted}
_, ok := nsRWs.rangeQueriesMap[key]
if !ok {
nsRWs.rangeQueriesMap[key] = rqi
nsRWs.rangeQueriesKeys = append(nsRWs.rangeQueriesKeys, key)
}
}
// GetTxReadWriteSet returns the read-write set in the form that can be serialized
//匯出builder中生成的讀寫集
func (rws *RWSetBuilder) GetTxReadWriteSet() *TxRwSet {
txRWSet := &TxRwSet{}
sortedNamespaces := util.GetSortedKeys(rws.rwMap)
for _, ns := range sortedNamespaces {
//Get namespace specific read-writes
nsReadWriteMap := rws.rwMap[ns]
//add read set
var reads []*kvrwset.KVRead
sortedReadKeys := util.GetSortedKeys(nsReadWriteMap.readMap)
for _, key := range sortedReadKeys {
reads = append(reads, nsReadWriteMap.readMap[key])
}
//add write set
var writes []*kvrwset.KVWrite
sortedWriteKeys := util.GetSortedKeys(nsReadWriteMap.writeMap)
for _, key := range sortedWriteKeys {
writes = append(writes, nsReadWriteMap.writeMap[key])
}
//add range query info
var rangeQueriesInfo []*kvrwset.RangeQueryInfo
rangeQueriesMap := nsReadWriteMap.rangeQueriesMap
for _, key := range nsReadWriteMap.rangeQueriesKeys {
rangeQueriesInfo = append(rangeQueriesInfo, rangeQueriesMap[key])
}
kvRWs := &kvrwset.KVRWSet{Reads: reads, Writes: writes, RangeQueriesInfo: rangeQueriesInfo}
nsRWs := &NsRwSet{ns, kvRWs}
txRWSet.NsRwSets = append(txRWSet.NsRwSets, nsRWs)
}
return txRWSet
}
以上是交易讀寫集的構造,接下來看交易讀寫集的驗證
相關檔案在ledger/kvledger/txmgmt/validator/statebasedval目錄下
state_based_validator.go
//validate endorser transaction
func (v *Validator) validateEndorserTX(envBytes []byte, doMVCCValidation bool, updates *statedb.UpdateBatch) (*rwsetutil.TxRwSet, peer.TxValidationCode, error) {
// extract actions from the envelope message
respPayload, err := putils.GetActionFromEnvelope(envBytes)
if err != nil {
return nil, peer.TxValidationCode_NIL_TXACTION, nil
}
//preparation for extracting RWSet from transaction
txRWSet := &rwsetutil.TxRwSet{}
// Get the Result from the Action
// and then Unmarshal it into a TxReadWriteSet using custom unmarshalling
if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil {
return nil, peer.TxValidationCode_INVALID_OTHER_REASON, nil
}
txResult := peer.TxValidationCode_VALID
//以上都是反序列化交易讀寫集
//mvccvalidation, may invalidate transaction
if doMVCCValidation {
if txResult, err = v.validateTx(txRWSet, updates); err != nil {
return nil, txResult, err
} else if txResult != peer.TxValidationCode_VALID {
txRWSet = nil
}
}
return txRWSet, txResult, err
}
// ValidateAndPrepareBatch implements method in Validator interface
func (v *Validator) ValidateAndPrepareBatch(block *common.Block, doMVCCValidation bool) (*statedb.UpdateBatch, error) {
logger.Debugf("New block arrived for validation:%#v, doMVCCValidation=%t", block, doMVCCValidation)
//記錄狀態資料庫的更改
updates := statedb.NewUpdateBatch()
logger.Debugf("Validating a block with [%d] transactions", len(block.Data.Data))
// Committer validator has already set validation flags based on well formed tran checks
//獲取區塊里交易的狀態
txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
// Precaution in case committer validator has not added validation flags yet
//判斷是否已經被其他驗證者驗證過
if len(txsFilter) == 0 {
txsFilter = util.NewTxValidationFlags(len(block.Data.Data))
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
}
for txIndex, envBytes := range block.Data.Data {
//略過無效交易
if txsFilter.IsInvalid(txIndex) {
// Skiping invalid transaction
logger.Warningf("Block [%d] Transaction index [%d] marked as invalid by committer. Reason code [%d]",
block.Header.Number, txIndex, txsFilter.Flag(txIndex))
continue
}
env, err := putils.GetEnvelopeFromBlock(envBytes)
if err != nil {
return nil, err
}
payload, err := putils.GetPayload(env)
if err != nil {
return nil, err
}
chdr, err := putils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return nil, err
}
txType := common.HeaderType(chdr.Type)
//略過非endorser交易
if txType != common.HeaderType_ENDORSER_TRANSACTION {
logger.Debugf("Skipping mvcc validation for Block [%d] Transaction index [%d] because, the transaction type is [%s]",
block.Header.Number, txIndex, txType)
continue
}
//驗證交易讀寫集,txRWSet為過濾后的交易讀寫集, txResult交易的狀態, err錯誤資訊
txRWSet, txResult, err := v.validateEndorserTX(envBytes, doMVCCValidation, updates)
if err != nil {
return nil, err
}
//更新交易的狀態
txsFilter.SetFlag(txIndex, txResult)
//txRWSet != nil => t is valid
//將過濾后的交易放入待更新的狀態集中
if txRWSet != nil {
committingTxHeight := version.NewHeight(block.Header.Number, uint64(txIndex))
addWriteSetToBatch(txRWSet, committingTxHeight, updates)
txsFilter.SetFlag(txIndex, peer.TxValidationCode_VALID)
}
if txsFilter.IsValid(txIndex) {
logger.Debugf("Block [%d] Transaction index [%d] TxId [%s] marked as valid by state validator",
block.Header.Number, txIndex, chdr.TxId)
} else {
logger.Warningf("Block [%d] Transaction index [%d] TxId [%s] marked as invalid by state validator. Reason code [%d]",
block.Header.Number, txIndex, chdr.TxId, txsFilter.Flag(txIndex))
}
}
//更新交易狀態到區塊的元資料中
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
return updates, nil
}
func addWriteSetToBatch(txRWSet *rwsetutil.TxRwSet, txHeight *version.Height, batch *statedb.UpdateBatch) {
for _, nsRWSet := range txRWSet.NsRwSets {
ns := nsRWSet.NameSpace
for _, kvWrite := range nsRWSet.KvRwSet.Writes {
if kvWrite.IsDelete {
batch.Delete(ns, kvWrite.Key, txHeight)
} else {
batch.Put(ns, kvWrite.Key, kvWrite.Value, txHeight)
}
}
}
}
func (v *Validator) validateTx(txRWSet *rwsetutil.TxRwSet, updates *statedb.UpdateBatch) (peer.TxValidationCode, error) {
for _, nsRWSet := range txRWSet.NsRwSets {
ns := nsRWSet.NameSpace
//驗證讀集
if valid, err := v.validateReadSet(ns, nsRWSet.KvRwSet.Reads, updates); !valid || err != nil {
if err != nil {
return peer.TxValidationCode(-1), err
}
return peer.TxValidationCode_MVCC_READ_CONFLICT, nil
}
//rangequeries是讀集的拓展
if valid, err := v.validateRangeQueries(ns, nsRWSet.KvRwSet.RangeQueriesInfo, updates); !valid || err != nil {
if err != nil {
return peer.TxValidationCode(-1), err
}
return peer.TxValidationCode_PHANTOM_READ_CONFLICT, nil
}
}
return peer.TxValidationCode_VALID, nil
}
func (v *Validator) validateReadSet(ns string, kvReads []*kvrwset.KVRead, updates *statedb.UpdateBatch) (bool, error) {
for _, kvRead := range kvReads {
if valid, err := v.validateKVRead(ns, kvRead, updates); !valid || err != nil {
return valid, err
}
}
return true, nil
}
上面的一段是驗證的外圍操作,主要是判讀交易是否需要驗證以及驗證成功后將交易放入待更新的狀態集中、將一個區塊中的所有的交易狀態保存到區塊的元資料中
核心的驗證代碼在validateKVRead中
// validateKVRead performs mvcc check for a key read during transaction simulation.
// i.e., it checks whether a key/version combination is already updated in the statedb (by an already committed block)
// or in the updates (by a preceding valid transaction in the current block)
func (v *Validator) validateKVRead(ns string, kvRead *kvrwset.KVRead, updates *statedb.UpdateBatch) (bool, error) {
//看狀態更新集里面有沒有與讀集相同的內容,有則說明之前的交易修改了該鍵值對,該交易無效
if updates.Exists(ns, kvRead.Key) {
return false, nil
}
//讀取世界狀態中讀集對應的版本號
versionedValue, err := v.db.GetState(ns, kvRead.Key)
if err != nil {
return false, err
}
var committedVersion *version.Height
if versionedValue != nil {
committedVersion = versionedValue.Version
}
//驗證世界狀態中的version與讀集中的version是否是一致的
if !version.AreSame(committedVersion, rwsetutil.NewVersion(kvRead.Version)) {
logger.Debugf("Version mismatch for key [%s:%s]. Committed version = [%s], Version in readSet [%s]",
ns, kvRead.Key, committedVersion, kvRead.Version)
return false, nil
}
return true, nil
}
主要驗證邏輯是:首先判斷狀態更新集里有沒有與讀集相同的內容,有則說明之前的交易修改了該鍵值對,該交易判定為無效,如果沒有則讀取世界狀態中讀集對應的版本號,判斷交易中的版本號與世界狀態中的版本號是否一致,如果一致則為有效交易,否則為無效交易,
三、狀態資料庫及歷史狀態資料庫
二者是不同的概念,分別對應世界狀態和歷史資料索引,歷史狀態資料庫保存操作包含在哪個區塊,并不保存值,只保存值變動的動作,
狀態資料庫要解決三個問題:
- 如何聯系智能合約中的鍵和底層存盤的鍵值對
- 如何持久化區塊資訊
- 如何標識最新的區塊資訊
第一個問題:狀態資料庫通過保存和讀取由通道名、鏈碼名、物件名及分隔符組成的組合鍵來建立智能合約中的鍵與底層存盤的鍵值對之間的關系
相關代碼在ledger/kvledger/txmgmt/stateleveldb目錄下
stateleveldb.go
// GetState implements method in VersionedDB interface
//獲取狀態,通過compositekey,由通道名、鏈碼名、物件名及分隔符組成查詢value
func (vdb *versionedDB) GetState(namespace string, key string) (*statedb.VersionedValue, error) {
logger.Debugf("GetState(). ns=%s, key=%s", namespace, key)
compositeKey := constructCompositeKey(namespace, key)
dbVal, err := vdb.db.Get(compositeKey)
if err != nil {
return nil, err
}
if dbVal == nil {
return nil, nil
}
val, ver := statedb.DecodeValue(dbVal)
return &statedb.VersionedValue{Value: val, Version: ver}, nil
}
// GetStateMultipleKeys implements method in VersionedDB interface
//查詢多個鍵的值
func (vdb *versionedDB) GetStateMultipleKeys(namespace string, keys []string) ([]*statedb.VersionedValue, error) {
vals := make([]*statedb.VersionedValue, len(keys))
for i, key := range keys {
val, err := vdb.GetState(namespace, key)
if err != nil {
return nil, err
}
vals[i] = val
}
return vals, nil
}
// GetStateRangeScanIterator implements method in VersionedDB interface
// startKey is inclusive
// endKey is exclusive
func (vdb *versionedDB) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (statedb.ResultsIterator, error) {
compositeStartKey := constructCompositeKey(namespace, startKey)
compositeEndKey := constructCompositeKey(namespace, endKey)
if endKey == "" {
compositeEndKey[len(compositeEndKey)-1] = lastKeyIndicator
}
dbItr := vdb.db.GetIterator(compositeStartKey, compositeEndKey)
return newKVScanner(namespace, dbItr), nil
}
// ExecuteQuery implements method in VersionedDB interface
func (vdb *versionedDB) ExecuteQuery(namespace, query string) (statedb.ResultsIterator, error) {
return nil, errors.New("ExecuteQuery not supported for leveldb")
}
以上是對處理、保存、讀取組合鍵的相關方法
第二、三個問題:區塊信息的持久化、標識最新的區塊資訊
相關代碼在同一個路徑下(ledger/kvledger/txmgmt/stateleveldb)
// ApplyUpdates implements method in VersionedDB interface
func (vdb *versionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version.Height) error {
//獲取更更新器
dbBatch := leveldbhelper.NewUpdateBatch()
//獲取namespace
namespaces := batch.GetUpdatedNamespaces()
//對每個namespace的更新內容回圈進行更新
for _, ns := range namespaces {
updates := batch.GetUpdates(ns)
for k, vv := range updates {
compositeKey := constructCompositeKey(ns, k)
logger.Debugf("Channel [%s]: Applying key=[%#v]", vdb.dbName, compositeKey)
//如果要更新的值為空則洗掉對應鍵,否則將compositekey作為鍵,value和version共同作為值保存到狀態資料庫中
if vv.Value == nil {
dbBatch.Delete(compositeKey)
} else {
dbBatch.Put(compositeKey, statedb.EncodeValue(vv.Value, vv.Version))
}
}
}
//標識最新的區塊高度,此處即是標識最新的區塊資訊
dbBatch.Put(savePointKey, height.ToBytes())
// Setting snyc to true as a precaution, false may be an ok optimization after further testing.
if err := vdb.db.WriteBatch(dbBatch, true); err != nil {
return err
}
return nil
}
狀態資料庫的實作代碼,包括幾個查詢方法、一個更新方法以及一些輔助方法(如智能合約中的鍵值對與狀態資料庫中鍵值對進行轉化的方法),狀態資料庫中的鍵是以智能合約的通道名、智能合約的名字以及智能合約中的鍵拼接在一起作為鍵,值則是智能合約中的值以及對應的狀態版本為值,
以上是區塊持久化及標識最新區塊的方法,狀態資料庫的點基本上就在上面的三個問題里了,
再看歷史資料庫的實作方法
同樣有兩個問題需要解決:
- 如何標識某個鍵是在哪一個交易中被改變的
- 如何查詢某個鍵的變動歷史
第一個問題:與狀態資料庫保存智能合約鍵的方法類似,也是通過組合鍵的形式,具體是通過由鍵和區塊id,交易id組成的組合鍵來完成的
// Commit implements method in HistoryDB interface
func (historyDB *historyDB) Commit(block *common.Block) error {
blockNo := block.Header.Number
//Set the starting tranNo to 0
var tranNo uint64
dbBatch := leveldbhelper.NewUpdateBatch()
logger.Debugf("Channel [%s]: Updating history database for blockNo [%v] with [%d] transactions",
historyDB.dbName, blockNo, len(block.Data.Data))
// Get the invalidation byte array for the block
//獲取區塊的驗證資料,用于判斷區塊中的每筆交易是否合法
txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
// Initialize txsFilter if it does not yet exist (e.g. during testing, for genesis block, etc)
if len(txsFilter) == 0 {
txsFilter = util.NewTxValidationFlags(len(block.Data.Data))
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
}
// write each tran's write set to history db
for _, envBytes := range block.Data.Data {
// If the tran is marked as invalid, skip it
//如果交易為無效交易則跳過
if txsFilter.IsInvalid(int(tranNo)) {
logger.Debugf("Channel [%s]: Skipping history write for invalid transaction number %d",
historyDB.dbName, tranNo)
tranNo++
continue
}
//獲取envelope
env, err := putils.GetEnvelopeFromBlock(envBytes)
if err != nil {
return err
}
//從envelope中獲取交易體
payload, err := putils.GetPayload(env)
if err != nil {
return err
}
//獲取并解碼交易體中通道頭的部分
chdr, err := putils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return err
}
//如果是普通交易(非配置交易)則修改歷史資料庫
if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION {
// extract actions from the envelope message
//從envelope中獲取操作
respPayload, err := putils.GetActionFromEnvelope(envBytes)
if err != nil {
return err
}
//preparation for extracting RWSet from transaction
//用于保存交易讀寫集
txRWSet := &rwsetutil.TxRwSet{}
// Get the Result from the Action and then Unmarshal
// it into a TxReadWriteSet using custom unmarshalling
//從操作結果中獲取讀寫集
if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil {
return err
}
// for each transaction, loop through the namespaces and writesets
// and add a history record for each write
for _, nsRWSet := range txRWSet.NsRwSets {
ns := nsRWSet.NameSpace
//namespace哪個通道、哪個智能合約
for _, kvWrite := range nsRWSet.KvRwSet.Writes {
writeKey := kvWrite.Key
//composite key for history records is in the form ns~key~blockNo~tranNo
//回圈將寫集中每個namespace下的每個鍵和區塊id,交易id作為組合鍵保存在狀態更新集中
compositeHistoryKey := historydb.ConstructCompositeHistoryKey(ns, writeKey, blockNo, tranNo)
// No value is required, write an empty byte array (emptyValue) since Put() of nil is not allowed
dbBatch.Put(compositeHistoryKey, emptyValue)
}
}
} else {
logger.Debugf("Skipping transaction [%d] since it is not an endorsement transaction\n", tranNo)
}
tranNo++
}
// add savepoint for recovery purpose
//將區塊高度保存下來
height := version.NewHeight(blockNo, tranNo)
dbBatch.Put(savePointKey, height.ToBytes())
// write the block's history records and savepoint to LevelDB
// Setting snyc to true as a precaution, false may be an ok optimization after further testing.
//把狀態更新集存入歷史資料庫
if err := historyDB.db.WriteBatch(dbBatch, true); err != nil {
return err
}
logger.Debugf("Channel [%s]: Updates committed to history database for blockNo [%v]", historyDB.dbName, blockNo)
return nil
}
第二個問題:它解決的是標識key是在哪一筆交易中修改的,具體解決方法是通過保存通道名,智能合約名,智能合約中的鍵,區塊編號以及交易編號的組合鍵來標識,
獲取歷史資訊
historyleveldb_query_executer.go
// GetHistoryForKey implements method in interface `ledger.HistoryQueryExecutor`
func (q *LevelHistoryDBQueryExecutor) GetHistoryForKey(namespace string, key string) (commonledger.ResultsIterator, error) {
//檢查賬本配置中歷史資料庫是否開啟
if ledgerconfig.IsHistoryDBEnabled() == false {
return nil, errors.New("History tracking not enabled - historyDatabase is false")
}
var compositeStartKey []byte
var compositeEndKey []byte
compositeStartKey = historydb.ConstructPartialCompositeHistoryKey(namespace, key, false)
compositeEndKey = historydb.ConstructPartialCompositeHistoryKey(namespace, key, true)
// range scan to find any history records starting with namespace~key
dbItr := q.historyDB.db.GetIterator(compositeStartKey, compositeEndKey)
//獲取歷史查詢物件,包括通道id、智能合約名的組合鍵、單獨的通道id、智能合約名、智能合約鍵、iterator、區塊存盤物件
return newHistoryScanner(compositeStartKey, namespace, key, dbItr, q.blockStore), nil
}
//historyScanner implements ResultsIterator for iterating through history results
type historyScanner struct {
compositePartialKey []byte //compositePartialKey includes namespace~key
namespace string
key string
dbItr iterator.Iterator
blockStore blkstorage.BlockStore
}
func newHistoryScanner(compositePartialKey []byte, namespace string, key string,
dbItr iterator.Iterator, blockStore blkstorage.BlockStore) *historyScanner {
return &historyScanner{compositePartialKey, namespace, key, dbItr, blockStore}
}
func (scanner *historyScanner) Next() (commonledger.QueryResult, error) {
if !scanner.dbItr.Next() {
return nil, nil
}
historyKey := scanner.dbItr.Key() // history key is in the form namespace~key~blocknum~trannum
// SplitCompositeKey(namespace~key~blocknum~trannum, namespace~key~) will return the blocknum~trannum in second position
_, blockNumTranNumBytes := historydb.SplitCompositeHistoryKey(historyKey, scanner.compositePartialKey)
blockNum, bytesConsumed := util.DecodeOrderPreservingVarUint64(blockNumTranNumBytes[0:])
tranNum, _ := util.DecodeOrderPreservingVarUint64(blockNumTranNumBytes[bytesConsumed:])
logger.Debugf("Found history record for namespace:%s key:%s at blockNumTranNum %v:%v\n",
scanner.namespace, scanner.key, blockNum, tranNum)
//首先根據歷史查詢物件獲取某個歷史記錄的區塊高度、交易編號
// Get the transaction from block storage that is associated with this history record
//根據區塊高度和交易編號獲取交易的envelope
tranEnvelope, err := scanner.blockStore.RetrieveTxByBlockNumTranNum(blockNum, tranNum)
if err != nil {
return nil, err
}
// Get the txid, key write value, timestamp, and delete indicator associated with this transaction
//根據envelope獲取交易的歷史資訊(交易id、更新值、時間戳等)
queryResult, err := getKeyModificationFromTran(tranEnvelope, scanner.namespace, scanner.key)
if err != nil {
return nil, err
}
logger.Debugf("Found historic key value for namespace:%s key:%s from transaction %s\n",
scanner.namespace, scanner.key, queryResult.(*queryresult.KeyModification).TxId)
return queryResult, nil
}
首先根據namespace(通道id、智能合約名)智能合約鍵查詢所有相關的記錄,根據查詢出的內容找到區塊高度和交易i,然后獲取envelope再根據envelope獲取交易的歷史資訊(交易id、更新值、時間戳等)
最后是區塊檔案存盤的代碼
相關檔案在common/ledger/blkstorage目錄下
blockstorage.go
// fsBlockStore - filesystem based implementation for `BlockStore`
type fsBlockStore struct {//區塊檔案存盤
id string
conf *Conf
fileMgr *blockfileMgr
}
// NewFsBlockStore constructs a `FsBlockStore`
func newFsBlockStore(id string, conf *Conf, indexConfig *blkstorage.IndexConfig,
dbHandle *leveldbhelper.DBHandle) *fsBlockStore {
return &fsBlockStore{id, conf, newBlockfileMgr(id, conf, indexConfig, dbHandle)}
}
// AddBlock adds a new block
//增加區塊方法
func (store *fsBlockStore) AddBlock(block *common.Block) error {
return store.fileMgr.addBlock(block)
}
// GetBlockchainInfo returns the current info about blockchain
//回傳鏈的資訊
func (store *fsBlockStore) GetBlockchainInfo() (*common.BlockchainInfo, error) {
return store.fileMgr.getBlockchainInfo(), nil
}
// RetrieveBlocks returns an iterator that can be used for iterating over a range of blocks
//回傳區塊的迭代器
func (store *fsBlockStore) RetrieveBlocks(startNum uint64) (ledger.ResultsIterator, error) {
var itr *blocksItr
var err error
if itr, err = store.fileMgr.retrieveBlocks(startNum); err != nil {
return nil, err
}
return itr, nil
}
// RetrieveBlockByHash returns the block for given block-hash
//通過區塊哈希回傳區塊的迭代器
func (store *fsBlockStore) RetrieveBlockByHash(blockHash []byte) (*common.Block, error) {
return store.fileMgr.retrieveBlockByHash(blockHash)
}
// RetrieveBlockByNumber returns the block at a given blockchain height
//通過區塊高度回傳區塊
func (store *fsBlockStore) RetrieveBlockByNumber(blockNum uint64) (*common.Block, error) {
return store.fileMgr.retrieveBlockByNumber(blockNum)
}
// RetrieveTxByID returns a transaction for given transaction id
//通過交易id回傳交易
func (store *fsBlockStore) RetrieveTxByID(txID string) (*common.Envelope, error) {
return store.fileMgr.retrieveTransactionByID(txID)
}
// RetrieveTxByID returns a transaction for given transaction id
//通過區塊高度、交易位置回傳交易
func (store *fsBlockStore) RetrieveTxByBlockNumTranNum(blockNum uint64, tranNum uint64) (*common.Envelope, error) {
return store.fileMgr.retrieveTransactionByBlockNumTranNum(blockNum, tranNum)
}
//通過交易id回傳區塊
func (store *fsBlockStore) RetrieveBlockByTxID(txID string) (*common.Block, error) {
return store.fileMgr.retrieveBlockByTxID(txID)
}
//通過交易id回傳交易驗證結果
func (store *fsBlockStore) RetrieveTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error) {
return store.fileMgr.retrieveTxValidationCodeByTxID(txID)
}
// Shutdown shuts down the block store
//關閉區塊存盤器
func (store *fsBlockStore) Shutdown() {
logger.Debugf("closing fs blockStore:%s", store.id)
store.fileMgr.close()
}
主要包括了區塊增加、查詢的方法
然后區塊檔案流,(但是對流的概念不怎么理解)
對賬本存盤進行一個總結:
-
賬本存盤介面定義:從整體上把握賬本存盤的方法
-
交易讀寫集:交易讀寫集的建立、校驗
-
狀態資料庫及歷史狀態資料庫:組合鍵
-
區塊檔案存盤及區塊索引:基于檔案系統
主要介面方法:
- peerledgerprovider:提供賬本層面的方法如創建賬本、打開賬本查詢賬本等,一個賬本對應三個資料狀態資料、歷史資料、區塊資料,有點類似排序節點manager
- peerledger:提供對賬本的操作,如對區塊的查詢、對狀態的查詢等,對狀態和歷史狀態的查詢分別使用queryexecutor和historyqueryexecutor實作的,
- txsimulator:交易模擬器,背書節點執行鏈碼時底層的支持
讀寫集的概念:是防止雙花的關鍵點,底層實作并不麻煩,就是進行操作前對狀態值和版本號進行校驗,
狀態資料庫:目前支持兩種資料庫couchdb、leveldb,couchdb支持模糊查詢而leveldb不支持,二者可以在開發程序中切換,有三個重要問題:
- 1、如何關聯智能合約鍵值對與底層存盤的鍵值對(資料隔離) 用組合鍵保存
- 2、如何持久化區塊的狀態資訊 交易讀寫集校驗后生成狀態更新集可用于狀態資訊的更新
- 3、如何標識最新存盤的區塊編號 在更新狀態資訊的同時記錄最新區塊的編號
歷史狀態資料庫:只保存操作包含在哪個區塊即不保存值,只保存值變動的動作
兩個重要問題:
- 1、標識某key被某交易改變 也是通過保存組合鍵來標識
- 2、如何查詢某key的變動歷史 組合鍵的前綴匹配查詢
區塊檔案存盤分為兩塊:區塊的存盤、區塊索引的存盤
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/325677.html
標籤:區塊鏈
上一篇:貨幣競爭,不是貨幣戰爭
