Implement a rebroadcast handler.

This commit implements a rebroadcast handler which deals with
rebroadcasting inventory at a random time interval between 0 and 30
minutes.  It then uses the new rebroadcast logic to ensure transactions
which were submitted via the sendrawtransaction RPC are rebroadcast until
they make it into a block.

Closes #99.
This commit is contained in:
mydesktop 2014-03-18 15:40:49 -04:00 committed by Dave Collins
parent 5fcdfb040a
commit ab002c90cc
4 changed files with 161 additions and 43 deletions

View file

@ -1062,8 +1062,16 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) {
b.server.txMemPool.RemoveDoubleSpends(tx)
}
// Notify registered websocket clients
if r := b.server.rpcServer; r != nil {
// Now that this block is in the blockchain we can mark all the
// transactions (except the coinbase) as no longer needing
// rebroadcasting.
for _, tx := range block.Transactions()[1:] {
iv := btcwire.NewInvVect(btcwire.InvTypeTx, tx.Sha())
b.server.RemoveRebroadcastInventory(iv)
}
// Notify registered websocket clients of incoming block.
r.ntfnMgr.NotifyBlockConnected(block)
}
@ -1087,7 +1095,7 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) {
}
}
// Notify registered websocket clients
// Notify registered websocket clients.
if r := b.server.rpcServer; r != nil {
r.ntfnMgr.NotifyBlockDisconnected(block)
}

View file

@ -94,6 +94,7 @@ type txMemPool struct {
outpoints map[btcwire.OutPoint]*btcutil.Tx
pennyTotal float64 // exponentially decaying total for penny spends.
lastPennyUnix int64 // unix time of last ``penny spend''
}
// isDust returns whether or not the passed transaction output amount is

View file

@ -1403,6 +1403,13 @@ func handleSendRawTransaction(s *rpcServer, cmd btcjson.Cmd) (interface{}, error
return nil, err
}
// We keep track of all the sendrawtransaction request txs because we need to
// rebroadcast them if they fail to get broadcast or entered into a block; for
// instance if the client was offline when they were generated. Refer to
// server.go in /btcd.
iv := btcwire.NewInvVect(btcwire.InvTypeTx, tx.Sha())
s.server.AddRebroadcastInventory(iv)
return tx.Sha().String(), nil
}

184
server.go
View file

@ -6,11 +6,14 @@ package main
import (
"container/list"
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
"github.com/conformal/btcdb"
"github.com/conformal/btcjson"
"github.com/conformal/btcwire"
"math"
"net"
"runtime"
"strconv"
@ -19,9 +22,9 @@ import (
"time"
)
// These constants are used by the DNS seed code to pick a random last seen
// time.
const (
// These constants are used by the DNS seed code to pick a random last seen
// time.
secondsIn3Days int32 = 24 * 60 * 60 * 3
secondsIn4Days int32 = 24 * 60 * 60 * 4
)
@ -46,33 +49,42 @@ type broadcastMsg struct {
excludePeers []*peer
}
// BroadcastInventoryAdd is a type used to declare that the InvVect it contains
// needs to be added to the rebroadcast map
type broadcastInventoryAdd *btcwire.InvVect
// BroadcastInventoryDel is a type used to declare that the InvVect it contains
// needs to be removed from the rebroadcast map
type broadcastInventoryDel *btcwire.InvVect
// server provides a bitcoin server for handling communications to and from
// bitcoin peers.
type server struct {
nonce uint64
listeners []net.Listener
btcnet btcwire.BitcoinNet
started int32 // atomic
shutdown int32 // atomic
shutdownSched int32 // atomic
bytesMutex sync.Mutex // For the following two fields.
bytesReceived uint64 // Total bytes received from all peers since start.
bytesSent uint64 // Total bytes sent by all peers since start.
addrManager *AddrManager
rpcServer *rpcServer
blockManager *blockManager
txMemPool *txMemPool
newPeers chan *peer
donePeers chan *peer
banPeers chan *peer
wakeup chan bool
query chan interface{}
relayInv chan *btcwire.InvVect
broadcast chan broadcastMsg
wg sync.WaitGroup
quit chan bool
nat NAT
db btcdb.Db
nonce uint64
listeners []net.Listener
btcnet btcwire.BitcoinNet
started int32 // atomic
shutdown int32 // atomic
shutdownSched int32 // atomic
bytesMutex sync.Mutex // For the following two fields.
bytesReceived uint64 // Total bytes received from all peers since start.
bytesSent uint64 // Total bytes sent by all peers since start.
addrManager *AddrManager
rpcServer *rpcServer
blockManager *blockManager
txMemPool *txMemPool
modifyRebroadcastInv chan interface{}
newPeers chan *peer
donePeers chan *peer
banPeers chan *peer
wakeup chan bool
query chan interface{}
relayInv chan *btcwire.InvVect
broadcast chan broadcastMsg
wg sync.WaitGroup
quit chan bool
nat NAT
db btcdb.Db
}
type peerState struct {
@ -84,6 +96,34 @@ type peerState struct {
maxOutboundPeers int
}
// randomUint16Number returns a random uint16 in a specified input range. Note
// that the range is in zeroth ordering; if you pass it 1800, you will get values
// from 0 to 1800. In order to avoid modulo bias and ensure every possible
// outcome in [0, max) has equal probability, the random number must be sampled
// from a random source that has a range limited to a multiple of the modulus.
func randomUint16Number(max uint16) uint16 {
var randomNumber uint16
var limitRange = (math.MaxUint16 / max) * max
for {
binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
if randomNumber < limitRange {
return (randomNumber % max)
}
}
}
// AddRebroadcastInventory dispatches a message to the rebroadcastHandler
// specifying to add an item to the rebroadcast map of InvVects
func (s *server) AddRebroadcastInventory(iv *btcwire.InvVect) {
s.modifyRebroadcastInv <- broadcastInventoryAdd(iv)
}
// RemoveRebroadcastInventory dispatches a message to the rebroadcastHandler
// specifying to remove an item from the rebroadcast map of InvVects
func (s *server) RemoveRebroadcastInventory(iv *btcwire.InvVect) {
s.modifyRebroadcastInv <- broadcastInventoryDel(iv)
}
func (p *peerState) Count() int {
return p.peers.Len() + p.outboundPeers.Len() + p.persistentPeers.Len()
}
@ -706,6 +746,61 @@ func (s *server) NetTotals() (uint64, uint64) {
return s.bytesReceived, s.bytesSent
}
// rebroadcastHandler is a listener that uses a couple of channels to maintain
// a list of transactions that need to be rebroadcast. The list of tx is stored
// in their abstracted P2P form (InvVect) in a map (pendingInvs).
// Why we need this:
// We handle user submitted tx, e.g. from a wallet, via the RPC submission
// function sendrawtransactions. Because we need to ensure that user-
// submitted tx eventually enter a block, we need to retransmit them
// periodically until we see them actually enter a block.
func (s *server) rebroadcastHandler() {
timer := time.NewTimer(5 * time.Minute) // Wait 5 min before first tx rebroadcast.
pendingInvs := make(map[btcwire.InvVect]struct{})
out:
for {
select {
case riv := <-s.modifyRebroadcastInv:
switch msg := riv.(type) {
// Incoming InvVects are added to our map of RPC txs.
case broadcastInventoryAdd:
pendingInvs[*msg] = struct{}{}
// When an InvVect has been added to a block, we can now remove it;
// note that we need to check if the iv is actually found in the
// map before we try to delete it, as when handleNotifyMsg finds a
// new block it cycles through the txs and sends them all
// indescriminately to this function. The if loop is cheap, so
// this should not be an issue.
case broadcastInventoryDel:
if _, ok := pendingInvs[*msg]; ok {
delete(pendingInvs, *msg)
}
}
// When the timer triggers, scan through all the InvVects of RPC-submitted
// tx and cause the server to resubmit them to peers, as they have not
// been added to incoming blocks.
case <-timer.C:
for iv := range pendingInvs {
ivCopy := iv
s.RelayInventory(&ivCopy)
}
// Set the timer to go off at a random time between 0 and 1799 seconds
timer.Reset(time.Second * time.Duration(randomUint16Number(1800)))
case <-s.quit:
break out
}
}
timer.Stop()
s.wg.Done()
}
// Start begins accepting connections from peers.
func (s *server) Start() {
// Already started?
@ -726,13 +821,19 @@ func (s *server) Start() {
// managers.
s.wg.Add(1)
go s.peerHandler()
if s.nat != nil {
s.wg.Add(1)
go s.upnpUpdateThread()
}
// Start the RPC server if it's not disabled.
if !cfg.DisableRPC {
s.wg.Add(1)
// Start the rebroadcastHandler, which ensures user tx received by
// the RPC server are rebroadcast until being included in a block.
go s.rebroadcastHandler()
s.rpcServer.Start()
}
}
@ -1030,20 +1131,21 @@ func newServer(listenAddrs []string, db btcdb.Db, btcnet btcwire.BitcoinNet) (*s
}
s := server{
nonce: nonce,
listeners: listeners,
btcnet: btcnet,
addrManager: amgr,
newPeers: make(chan *peer, cfg.MaxPeers),
donePeers: make(chan *peer, cfg.MaxPeers),
banPeers: make(chan *peer, cfg.MaxPeers),
wakeup: make(chan bool),
query: make(chan interface{}),
relayInv: make(chan *btcwire.InvVect, cfg.MaxPeers),
broadcast: make(chan broadcastMsg, cfg.MaxPeers),
quit: make(chan bool),
nat: nat,
db: db,
nonce: nonce,
listeners: listeners,
btcnet: btcnet,
addrManager: amgr,
newPeers: make(chan *peer, cfg.MaxPeers),
donePeers: make(chan *peer, cfg.MaxPeers),
banPeers: make(chan *peer, cfg.MaxPeers),
wakeup: make(chan bool),
query: make(chan interface{}),
relayInv: make(chan *btcwire.InvVect, cfg.MaxPeers),
broadcast: make(chan broadcastMsg, cfg.MaxPeers),
quit: make(chan bool),
modifyRebroadcastInv: make(chan interface{}),
nat: nat,
db: db,
}
bm, err := newBlockManager(&s)
if err != nil {