mempool: Associated tag with orphan txns.

This allows a caller-provided tag to be associated with orphan
transactions.  This is useful since the caller can use the tag for
purposes such as keeping track of which peers orphans were first seen
from.

Also, since a parameter is required now anyways, it associates the peer
ID with processed transactions from remote peers.
This commit is contained in:
Dave Collins 2016-10-28 12:48:54 -05:00
parent 2615fa0849
commit e992d55822
No known key found for this signature in database
GPG key ID: B8904D9D9C93D1F2
4 changed files with 25 additions and 16 deletions

View file

@ -413,7 +413,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) {
// memory pool, orphan handling, etc. // memory pool, orphan handling, etc.
allowOrphans := cfg.MaxOrphanTxs > 0 allowOrphans := cfg.MaxOrphanTxs > 0
acceptedTxs, err := b.server.txMemPool.ProcessTransaction(tmsg.tx, acceptedTxs, err := b.server.txMemPool.ProcessTransaction(tmsg.tx,
allowOrphans, true) allowOrphans, true, mempool.Tag(tmsg.peer.ID()))
// Remove transaction from request maps. Either the mempool/chain // Remove transaction from request maps. Either the mempool/chain
// already knows about it and as such we shouldn't have any more // already knows about it and as such we shouldn't have any more

View file

@ -40,6 +40,11 @@ const (
orphanExpireScanInterval = time.Minute * 5 orphanExpireScanInterval = time.Minute * 5
) )
// Tag represents an identifier to use for tagging orphan transactions. The
// caller may choose any scheme it desires, however it is common to use peer IDs
// so that orphans can be identified by which peer first relayed them.
type Tag uint64
// Config is a descriptor containing the memory pool configuration. // Config is a descriptor containing the memory pool configuration.
type Config struct { type Config struct {
// Policy defines the various mempool configuration options related // Policy defines the various mempool configuration options related
@ -132,6 +137,7 @@ type TxDesc struct {
// to it such as an expiration time to help prevent caching the orphan forever. // to it such as an expiration time to help prevent caching the orphan forever.
type orphanTx struct { type orphanTx struct {
tx *btcutil.Tx tx *btcutil.Tx
tag Tag
expiration time.Time expiration time.Time
} }
@ -268,7 +274,7 @@ func (mp *TxPool) limitNumOrphans() error {
// addOrphan adds an orphan transaction to the orphan pool. // addOrphan adds an orphan transaction to the orphan pool.
// //
// This function MUST be called with the mempool lock held (for writes). // This function MUST be called with the mempool lock held (for writes).
func (mp *TxPool) addOrphan(tx *btcutil.Tx) { func (mp *TxPool) addOrphan(tx *btcutil.Tx, tag Tag) {
// Nothing to do if no orphans are allowed. // Nothing to do if no orphans are allowed.
if mp.cfg.Policy.MaxOrphanTxs <= 0 { if mp.cfg.Policy.MaxOrphanTxs <= 0 {
return return
@ -281,6 +287,7 @@ func (mp *TxPool) addOrphan(tx *btcutil.Tx) {
mp.orphans[*tx.Hash()] = &orphanTx{ mp.orphans[*tx.Hash()] = &orphanTx{
tx: tx, tx: tx,
tag: tag,
expiration: time.Now().Add(orphanTTL), expiration: time.Now().Add(orphanTTL),
} }
for _, txIn := range tx.MsgTx().TxIn { for _, txIn := range tx.MsgTx().TxIn {
@ -298,7 +305,7 @@ func (mp *TxPool) addOrphan(tx *btcutil.Tx) {
// maybeAddOrphan potentially adds an orphan to the orphan pool. // maybeAddOrphan potentially adds an orphan to the orphan pool.
// //
// This function MUST be called with the mempool lock held (for writes). // This function MUST be called with the mempool lock held (for writes).
func (mp *TxPool) maybeAddOrphan(tx *btcutil.Tx) error { func (mp *TxPool) maybeAddOrphan(tx *btcutil.Tx, tag Tag) error {
// Ignore orphan transactions that are too large. This helps avoid // Ignore orphan transactions that are too large. This helps avoid
// a memory exhaustion attack based on sending a lot of really large // a memory exhaustion attack based on sending a lot of really large
// orphans. In the case there is a valid transaction larger than this, // orphans. In the case there is a valid transaction larger than this,
@ -318,7 +325,7 @@ func (mp *TxPool) maybeAddOrphan(tx *btcutil.Tx) error {
} }
// Add the orphan if the none of the above disqualified it. // Add the orphan if the none of the above disqualified it.
mp.addOrphan(tx) mp.addOrphan(tx, tag)
return nil return nil
} }
@ -981,7 +988,7 @@ func (mp *TxPool) ProcessOrphans(acceptedTx *btcutil.Tx) []*TxDesc {
// the passed one being accepted. // the passed one being accepted.
// //
// This function is safe for concurrent access. // This function is safe for concurrent access.
func (mp *TxPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit bool) ([]*TxDesc, error) { func (mp *TxPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit bool, tag Tag) ([]*TxDesc, error) {
log.Tracef("Processing transaction %v", tx.Hash()) log.Tracef("Processing transaction %v", tx.Hash())
// Protect concurrent access. // Protect concurrent access.
@ -1030,7 +1037,7 @@ func (mp *TxPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit bool
} }
// Potentially add the orphan transaction to the orphan pool. // Potentially add the orphan transaction to the orphan pool.
err = mp.maybeAddOrphan(tx) err = mp.maybeAddOrphan(tx, tag)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -409,7 +409,7 @@ func TestSimpleOrphanChain(t *testing.T) {
// none are evicted). // none are evicted).
for _, tx := range chainedTxns[1 : maxOrphans+1] { for _, tx := range chainedTxns[1 : maxOrphans+1] {
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true, acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true,
false) false, 0)
if err != nil { if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid "+ t.Fatalf("ProcessTransaction: failed to accept valid "+
"orphan %v", err) "orphan %v", err)
@ -432,7 +432,7 @@ func TestSimpleOrphanChain(t *testing.T) {
// to ensure it has no bearing on whether or not already existing // to ensure it has no bearing on whether or not already existing
// orphans in the pool are linked. // orphans in the pool are linked.
acceptedTxns, err := harness.txPool.ProcessTransaction(chainedTxns[0], acceptedTxns, err := harness.txPool.ProcessTransaction(chainedTxns[0],
false, false) false, false, 0)
if err != nil { if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid "+ t.Fatalf("ProcessTransaction: failed to accept valid "+
"orphan %v", err) "orphan %v", err)
@ -471,7 +471,7 @@ func TestOrphanReject(t *testing.T) {
// Ensure orphans are rejected when the allow orphans flag is not set. // Ensure orphans are rejected when the allow orphans flag is not set.
for _, tx := range chainedTxns[1:] { for _, tx := range chainedTxns[1:] {
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, false, acceptedTxns, err := harness.txPool.ProcessTransaction(tx, false,
false) false, 0)
if err == nil { if err == nil {
t.Fatalf("ProcessTransaction: did not fail on orphan "+ t.Fatalf("ProcessTransaction: did not fail on orphan "+
"%v when allow orphans flag is false", tx.Hash()) "%v when allow orphans flag is false", tx.Hash())
@ -528,7 +528,7 @@ func TestOrphanEviction(t *testing.T) {
// all accepted. This will cause an eviction. // all accepted. This will cause an eviction.
for _, tx := range chainedTxns[1:] { for _, tx := range chainedTxns[1:] {
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true, acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true,
false) false, 0)
if err != nil { if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid "+ t.Fatalf("ProcessTransaction: failed to accept valid "+
"orphan %v", err) "orphan %v", err)
@ -592,7 +592,7 @@ func TestBasicOrphanRemoval(t *testing.T) {
// none are evicted). // none are evicted).
for _, tx := range chainedTxns[1 : maxOrphans+1] { for _, tx := range chainedTxns[1 : maxOrphans+1] {
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true, acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true,
false) false, 0)
if err != nil { if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid "+ t.Fatalf("ProcessTransaction: failed to accept valid "+
"orphan %v", err) "orphan %v", err)
@ -667,7 +667,7 @@ func TestOrphanChainRemoval(t *testing.T) {
// none are evicted). // none are evicted).
for _, tx := range chainedTxns[1 : maxOrphans+1] { for _, tx := range chainedTxns[1 : maxOrphans+1] {
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true, acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true,
false) false, 0)
if err != nil { if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid "+ t.Fatalf("ProcessTransaction: failed to accept valid "+
"orphan %v", err) "orphan %v", err)
@ -730,7 +730,7 @@ func TestMultiInputOrphanDoubleSpend(t *testing.T) {
// except the final one. // except the final one.
for _, tx := range chainedTxns[1:maxOrphans] { for _, tx := range chainedTxns[1:maxOrphans] {
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true, acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true,
false) false, 0)
if err != nil { if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid "+ t.Fatalf("ProcessTransaction: failed to accept valid "+
"orphan %v", err) "orphan %v", err)
@ -756,7 +756,7 @@ func TestMultiInputOrphanDoubleSpend(t *testing.T) {
t.Fatalf("unable to create signed tx: %v", err) t.Fatalf("unable to create signed tx: %v", err)
} }
acceptedTxns, err := harness.txPool.ProcessTransaction(doubleSpendTx, acceptedTxns, err := harness.txPool.ProcessTransaction(doubleSpendTx,
true, false) true, false, 0)
if err != nil { if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid orphan %v", t.Fatalf("ProcessTransaction: failed to accept valid orphan %v",
err) err)
@ -775,7 +775,7 @@ func TestMultiInputOrphanDoubleSpend(t *testing.T) {
// This will cause the shared output to become a concrete spend which // This will cause the shared output to become a concrete spend which
// will in turn must cause the double spending orphan to be removed. // will in turn must cause the double spending orphan to be removed.
acceptedTxns, err = harness.txPool.ProcessTransaction(chainedTxns[0], acceptedTxns, err = harness.txPool.ProcessTransaction(chainedTxns[0],
false, false) false, false, 0)
if err != nil { if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid tx %v", err) t.Fatalf("ProcessTransaction: failed to accept valid tx %v", err)
} }

View file

@ -3424,8 +3424,10 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st
} }
} }
// Use 0 for the tag to represent local node.
tx := btcutil.NewTx(&msgTx) tx := btcutil.NewTx(&msgTx)
acceptedTxs, err := s.server.txMemPool.ProcessTransaction(tx, false, false) acceptedTxs, err := s.server.txMemPool.ProcessTransaction(tx, false,
false, 0)
if err != nil { if err != nil {
// When the error is a rule error, it means the transaction was // When the error is a rule error, it means the transaction was
// simply rejected as opposed to something actually going wrong, // simply rejected as opposed to something actually going wrong,