btcd 之 mempool
参考:btcd
- btcd提供了一个内存池mempool,用于存储还未被矿工打包的交易。
- utxo和block index存储于leveldb中,而不是本文讨论的mempool
- 被插入之前要进行一系列的正确性验证(通过mabeAcceptTransaction)。
- 如果是orphan tx(即在main chain和mempool里找不到input的所属交易tx)会暂时插入到orphans pool(通过ProcessTransaction)。
- 当新的block连接到主链时,会把block中的tx从mempool中移除,相应的也会转移orphans pool中依赖此block的tx到mempool中(通过MabeAcceptTransaction)
- 当block从主链脱离时,会重新处理block里的tx(通过MabeAcceptTransaction)
一、创建mempool对象
- 在server被创建的时候调用此方法生成一个mempool对象
-
可以看到,pool用来存储正常的tx,orphans存储orphan tx
// New returns a new memory pool for validating and storing standalone // transactions until they are mined into a block. func New(cfg *Config) *TxPool { return &TxPool{ cfg: *cfg, pool: make(map[chainhash.Hash]*TxDesc), orphans: make(map[chainhash.Hash]*orphanTx), orphansByPrev: make(map[wire.OutPoint]map[chainhash.Hash]*btcutil.Tx), nextExpireScan: time.Now().Add(orphanExpireScanInterval), outpoints: make(map[wire.OutPoint]*btcutil.Tx), } }
二、ProcessTransaction
- 该方法用来处理通过rpc请求(handleSendRawTransaction)发送的的rawtx和通过p2p网络(OnTx)同步的tx
-
支持orphan tx的插入
// ProcessTransaction is the main workhorse for handling insertion of new // free-standing transactions into the memory pool. It includes functionality // such as rejecting duplicate transactions, ensuring transactions follow all // rules, orphan transaction handling, and insertion into the memory pool. // // It returns a slice of transactions added to the mempool. When the // error is nil, the list will include the passed transaction itself along // with any additional orphan transaactions that were added as a result of // the passed one being accepted. // // This function is safe for concurrent access. func (mp *TxPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit bool, tag Tag) ([]*TxDesc, error) { log.Tracef("Processing transaction %v", tx.Hash()) // Protect concurrent access. mp.mtx.Lock() defer mp.mtx.Unlock() // Potentially accept the transaction to the memory pool. missingParents, txD, err := mp.maybeAcceptTransaction(tx, true, rateLimit, true) if err != nil { return nil, err } if len(missingParents) == 0 { // Accept any orphan transactions that depend on this // transaction (they may no longer be orphans if all inputs // are now available) and repeat for those accepted // transactions until there are no more. newTxs := mp.processOrphans(tx) acceptedTxs := make([]*TxDesc, len(newTxs)+1) // Add the parent transaction first so remote nodes // do not add orphans. acceptedTxs[0] = txD copy(acceptedTxs[1:], newTxs) return acceptedTxs, nil } // The transaction is an orphan (has inputs missing). Reject // it if the flag to allow orphans is not set. if !allowOrphan { // Only use the first missing parent transaction in // the error message. // // NOTE: RejectDuplicate is really not an accurate // reject code here, but it matches the reference // implementation and there isn't a better choice due // to the limited number of reject codes. Missing // inputs is assumed to mean they are already spent // which is not really always the case. str := fmt.Sprintf("orphan transaction %v references "+ "outputs of unknown or fully-spent "+ "transaction %v", tx.Hash(), missingParents[0]) return nil, txRuleError(wire.RejectDuplicate, str) } // Potentially add the orphan transaction to the orphan pool. err = mp.maybeAddOrphan(tx, tag) return nil, err }
三、MaybeAcceptTransaction
- 当block 从主链 connect / disconnect 的时候(blockchain.NTBlockConnected/blockchain.NTBlockDisconnected),使用此函数处理block中的tx(当然不会包含coinbase)
-
不支持orphan tx的插入
// MaybeAcceptTransaction is the main workhorse for handling insertion of new // free-standing transactions into a memory pool. It includes functionality // such as rejecting duplicate transactions, ensuring transactions follow all // rules, detecting orphan transactions, and insertion into the memory pool. // // If the transaction is an orphan (missing parent transactions), the // transaction is NOT added to the orphan pool, but each unknown referenced // parent is returned. Use ProcessTransaction instead if new orphans should // be added to the orphan pool. // // This function is safe for concurrent access. func (mp *TxPool) MaybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit bool) ([]*chainhash.Hash, *TxDesc, error) { // Protect concurrent access. mp.mtx.Lock() hashes, txD, err := mp.maybeAcceptTransaction(tx, isNew, rateLimit, true) mp.mtx.Unlock() return hashes, txD, err }
四、mabeAcceptTransaction
- ProcessTransaction和MabeAcceptTransaction最终都调用的该方法
- 该方法会做一系列合法性验证然后将tx插入mempool
五、ProcessOrhpans
-
当有新的block connect 到主链时,对block中的每一个tx都检测是否存在基于它的orphan tx,并将存在的tx移到mempool中
// ProcessOrphans determines if there are any orphans which depend on the passed // transaction hash (it is possible that they are no longer orphans) and // potentially accepts them to the memory pool. It repeats the process for the // newly accepted transactions (to detect further orphans which may no longer be // orphans) until there are no more. // // It returns a slice of transactions added to the mempool. A nil slice means // no transactions were moved from the orphan pool to the mempool. // // This function is safe for concurrent access. func (mp *TxPool) ProcessOrphans(acceptedTx *btcutil.Tx) []*TxDesc { mp.mtx.Lock() acceptedTxns := mp.processOrphans(acceptedTx) mp.mtx.Unlock() return acceptedTxns }
六、Fetchtransaction
- 从mem pool中获取对应hash的tx,不包含orphans pool
-
在rpc请求的一些方法(searchrawtransactions、getrawtransaction、gettxout、)和p2p网络发送txmsg使用
// FetchTransaction returns the requested transaction from the transaction pool. // This only fetches from the main transaction pool and does not include // orphans. // // This function is safe for concurrent access. func (mp *TxPool) FetchTransaction(txHash *chainhash.Hash) (*btcutil.Tx, error) { // Protect concurrent access. mp.mtx.RLock() txDesc, exists := mp.pool[*txHash] mp.mtx.RUnlock() if exists { return txDesc.Tx, nil } return nil, fmt.Errorf("transaction is not in the pool") }
七、MiningDescs
-
rpc getBlockTemplate 使用此方法生成挖矿需要的数据
// MiningDescs returns a slice of mining descriptors for all the transactions // in the pool. // // This is part of the mining.TxSource interface implementation and is safe for // concurrent access as required by the interface contract. func (mp *TxPool) MiningDescs() []*mining.TxDesc { mp.mtx.RLock() descs := make([]*mining.TxDesc, len(mp.pool)) i := 0 for _, desc := range mp.pool { descs[i] = &desc.TxDesc i++ } mp.mtx.RUnlock() return descs }
八、RemoveTransaction
- 从mempool中移除交易
-
当有新的block connect到主链时,移除block中的transaction
// RemoveTransaction removes the passed transaction from the mempool. When the // removeRedeemers flag is set, any transactions that redeem outputs from the // removed transaction will also be removed recursively from the mempool, as // they would otherwise become orphans. // // This function is safe for concurrent access. func (mp *TxPool) RemoveTransaction(tx *btcutil.Tx, removeRedeemers bool) { // Protect concurrent access. mp.mtx.Lock() mp.removeTransaction(tx, removeRedeemers) mp.mtx.Unlock() }
九、checkPoolDoubleSpend
- 双花检测
-
通过对比 待插入txin的PreviousOutPoint和内存池里边的已经存在的outpoints
// checkPoolDoubleSpend checks whether or not the passed transaction is // attempting to spend coins already spent by other transactions in the pool. // Note it does not check for double spends against transactions already in the // main chain. // // This function MUST be called with the mempool lock held (for reads). func (mp *TxPool) checkPoolDoubleSpend(tx *hcutil.Tx, txType stake.TxType) error { for i, txIn := range tx.MsgTx().TxIn { // We don't care about double spends of stake bases. if i == 0 && (txType == stake.TxTypeSSGen || txType == stake.TxTypeSSRtx) { continue } if txR, exists := mp.outpoints[txIn.PreviousOutPoint]; exists { str := fmt.Sprintf("transaction %v in the pool "+ "already spends the same coins", txR.Hash()) return txRuleError(wire.RejectDuplicate, str) } } return nil }
-
outpoint 包含hash和索引
// OutPoint defines a HC data type that is used to track previous // transaction outputs. type OutPoint struct { Hash chainhash.Hash Index uint32 Tree int8 }
十、PruneExpiredTx
- 根据高度移除过期交易
-
当从其他peer同步到一个块的时候和重组的时候发生
// PruneExpiredTx prunes expired transactions from the mempool that may no longer // be able to be included into a block. func (mp *TxPool) PruneExpiredTx(height int64) { // Protect concurrent access. mp.mtx.Lock() mp.pruneExpiredTx(height) mp.mtx.Unlock() } func (mp *TxPool) pruneExpiredTx(height int64) { for _, tx := range mp.pool { if tx.Tx.MsgTx().Expiry != 0 { if height >= int64(tx.Tx.MsgTx().Expiry) { log.Debugf("Pruning expired transaction %v "+ "from the mempool", tx.Tx.Hash()) mp.removeTransaction(tx.Tx, true) } } } }
有疑问加站长微信联系(非本文作者)