Fix #138 by dynamically updating heights of peers

In order to avoid prior situations of stalled syncs due to
outdated peer height data, we now update block heights up peers in
real-time as we learn of their announced
blocks.

Updates happen when:
   * A peer sends us an orphan block. We update based on
     the height embedded in the scriptSig for the coinbase tx
   * When a peer sends us an inv for a block we already know
     of
   * When peers announce new blocks. Subsequent
     announcements that lost the announcement race are
     recognized and peer heights are updated accordingly

Additionally, the `getpeerinfo` command has been modified
to include both the starting height, and current height of
connected peers.

Docs have been updated with `getpeerinfo` extension.
This commit is contained in:
Olaoluwa Osuntokun 2015-02-11 12:39:11 -08:00
parent 7c46f213e1
commit 1bf564d963
6 changed files with 199 additions and 22 deletions

View file

@ -131,7 +131,7 @@ func (b *BlockChain) maybeAcceptBlock(block *btcutil.Block, flags BehaviorFlags)
// blocks whose version is the serializedHeightVersion or
// newer once a majority of the network has upgraded. This is
// part of BIP0034.
if blockHeader.Version >= serializedHeightVersion &&
if ShouldHaveSerializedBlockHeight(blockHeader) &&
b.isMajorityVersion(serializedHeightVersion, prevNode,
b.chainParams.BlockEnforceNumRequired) {

View file

@ -89,10 +89,19 @@ func isNullOutpoint(outpoint *wire.OutPoint) bool {
return false
}
// IsCoinBaseTx determines whether or not a transaction is a coinbase. A
// coinbase is a special transaction created by miners that has no inputs. This
// is represented in the block chain by a transaction with a single input that
// has a previous output transaction index set to the maximum value along with a
// ShouldHaveSerializedBlockHeight determines if a block should have a
// serialized block height embedded within the scriptSig of its
// coinbase transaction. Judgement is based on the block version in the block
// header. Blocks with version 2 and above satisfy this criteria. See BIP0034
// for further information.
func ShouldHaveSerializedBlockHeight(header *wire.BlockHeader) bool {
return header.Version >= serializedHeightVersion
}
// IsCoinBaseTx determines whether or not a transaction is a coinbase. A coinbase
// is a special transaction created by miners that has no inputs. This is
// represented in the block chain by a transaction with a single input that has
// a previous output transaction index set to the maximum value along with a
// zero hash.
//
// This function only differs from IsCoinBase in that it works with a raw wire
@ -569,16 +578,18 @@ func CheckBlockSanity(block *btcutil.Block, powLimit *big.Int, timeSource Median
return checkBlockSanity(block, powLimit, timeSource, BFNone)
}
// checkSerializedHeight checks if the signature script in the passed
// transaction starts with the serialized block height of wantHeight.
func checkSerializedHeight(coinbaseTx *btcutil.Tx, wantHeight int64) error {
// ExtractCoinbaseHeight attempts to extract the height of the block
// from the scriptSig of a coinbase transaction. Coinbase heights
// are only present in blocks of version 2 or later. This was added as part of
// BIP0034.
func ExtractCoinbaseHeight(coinbaseTx *btcutil.Tx) (int64, error) {
sigScript := coinbaseTx.MsgTx().TxIn[0].SignatureScript
if len(sigScript) < 1 {
str := "the coinbase signature script for blocks of " +
"version %d or greater must start with the " +
"length of the serialized block height"
str = fmt.Sprintf(str, serializedHeightVersion)
return ruleError(ErrMissingCoinbaseHeight, str)
return 0, ruleError(ErrMissingCoinbaseHeight, str)
}
serializedLen := int(sigScript[0])
@ -587,19 +598,30 @@ func checkSerializedHeight(coinbaseTx *btcutil.Tx, wantHeight int64) error {
"version %d or greater must start with the " +
"serialized block height"
str = fmt.Sprintf(str, serializedLen)
return ruleError(ErrMissingCoinbaseHeight, str)
return 0, ruleError(ErrMissingCoinbaseHeight, str)
}
serializedHeightBytes := make([]byte, 8, 8)
copy(serializedHeightBytes, sigScript[1:serializedLen+1])
serializedHeight := binary.LittleEndian.Uint64(serializedHeightBytes)
if int64(serializedHeight) != wantHeight {
return int64(serializedHeight), nil
}
// checkSerializedHeight checks if the signature script in the passed
// transaction starts with the serialized block height of wantHeight.
func checkSerializedHeight(coinbaseTx *btcutil.Tx, wantHeight int64) error {
serializedHeight, err := ExtractCoinbaseHeight(coinbaseTx)
if err != nil {
return err
}
if serializedHeight != wantHeight {
str := fmt.Sprintf("the coinbase signature script serialized "+
"block height is %d when %d was expected",
serializedHeight, wantHeight)
return ruleError(ErrBadCoinbaseHeight, str)
}
return nil
}

View file

@ -587,8 +587,40 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
return
}
// Meta-data about the new block this peer is reporting. We use this
// below to update this peer's lastest block height and the heights of
// other peers based on their last announced block sha. This allows us
// to dynamically update the block heights of peers, avoiding stale heights
// when looking for a new sync peer. Upon acceptance of a block or
// recognition of an orphan, we also use this information to update
// the block heights over other peers who's invs may have been ignored
// if we are actively syncing while the chain is not yet current or
// who may have lost the lock announcment race.
var heightUpdate int32
var blkShaUpdate *wire.ShaHash
// Request the parents for the orphan block from the peer that sent it.
if isOrphan {
// We've just received an orphan block from a peer. In order
// to update the height of the peer, we try to extract the
// block height from the scriptSig of the coinbase transaction.
// Extraction is only attempted if the block's version is
// high enough (ver 2+).
header := &bmsg.block.MsgBlock().Header
if blockchain.ShouldHaveSerializedBlockHeight(header) {
coinbaseTx := bmsg.block.Transactions()[0]
cbHeight, err := blockchain.ExtractCoinbaseHeight(coinbaseTx)
if err != nil {
bmgrLog.Warnf("Unable to extract height from "+
"coinbase tx: %v", err)
} else {
bmgrLog.Debugf("Extracted height of %v from "+
"orphan block", cbHeight)
heightUpdate = int32(cbHeight)
blkShaUpdate = blockSha
}
}
orphanRoot := b.blockChain.GetOrphanRoot(blockSha)
locator, err := b.blockChain.LatestBlockLocator()
if err != nil {
@ -600,7 +632,6 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
} else {
// When the block is not an orphan, log information about it and
// update the chain state.
b.progressLogger.LogBlockHeight(bmsg.block)
// Query the db for the latest best block since the block
@ -609,6 +640,11 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
newestSha, newestHeight, _ := b.server.db.NewestSha()
b.updateChainState(newestSha, newestHeight)
// Update this peer's latest block height, for future
// potential sync node candidancy.
heightUpdate = int32(newestHeight)
blkShaUpdate = newestSha
// Allow any clients performing long polling via the
// getblocktemplate RPC to be notified when the new block causes
// their old block template to become stale.
@ -618,6 +654,16 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
}
}
// Update the block height for this peer. But only send a message to
// the server for updating peer heights if this is an orphan or our
// chain is "current". This avoid sending a spammy amount of messages
// if we're syncing the chain from scratch.
if blkShaUpdate != nil && heightUpdate != 0 {
bmsg.peer.UpdateLastBlockHeight(heightUpdate)
if isOrphan || b.current() {
go b.server.UpdatePeerHeights(blkShaUpdate, int32(heightUpdate), bmsg.peer)
}
}
// Sync the db to disk.
b.server.db.Sync()
@ -856,12 +902,6 @@ func (b *blockManager) haveInventory(invVect *wire.InvVect) (bool, error) {
// handleInvMsg handles inv messages from all peers.
// We examine the inventory advertised by the remote peer and act accordingly.
func (b *blockManager) handleInvMsg(imsg *invMsg) {
// Ignore invs from peers that aren't the sync if we are not current.
// Helps prevent fetching a mass of orphans.
if imsg.peer != b.syncPeer && !b.current() {
return
}
// Attempt to find the final block in the inventory list. There may
// not be one.
lastBlock := -1
@ -873,6 +913,36 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
}
}
// If this inv contains a block annoucement, and this isn't coming from
// our current sync peer or we're current, then update the last
// announced block for this peer. We'll use this information later to
// update the heights of peers based on blocks we've accepted that they
// previously announced.
if lastBlock != -1 && (imsg.peer != b.syncPeer || b.current()) {
imsg.peer.UpdateLastAnnouncedBlock(&invVects[lastBlock].Hash)
}
// Ignore invs from peers that aren't the sync if we are not current.
// Helps prevent fetching a mass of orphans.
if imsg.peer != b.syncPeer && !b.current() {
return
}
// If our chain is current and a peer announces a block we already
// know of, then update their current block height.
if lastBlock != -1 && b.current() {
exists, err := b.server.db.ExistsSha(&invVects[lastBlock].Hash)
if err == nil && exists {
blkHeight, err := b.server.db.FetchBlockHeightBySha(&invVects[lastBlock].Hash)
if err != nil {
bmgrLog.Warnf("Unable to fetch block height for block (sha: %v), %v",
&invVects[lastBlock].Hash, err)
} else {
imsg.peer.UpdateLastBlockHeight(int32(blkHeight))
}
}
}
// Request the advertised inventory if we don't already have it. Also,
// request parent blocks of orphans if we receive one we already have.
// Finally, attempt to detect potential stalls due to long side chains

View file

@ -393,8 +393,8 @@ the method name for further details such as parameter and return information.
|Method|getpeerinfo|
|Parameters|None|
|Description|Returns data about each connected network peer as an array of json objects.|
|Returns|`[`<br />&nbsp;&nbsp;`{`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"addr": "host:port", (string) the ip address and port of the peer`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"services": "00000001", (string) the services supported by the peer`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"lastrecv": n, (numeric) time the last message was received in seconds since 1 Jan 1970 GMT`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"lastsend": n, (numeric) time the last message was sent in seconds since 1 Jan 1970 GMT`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"bytessent": n, (numeric) total bytes sent`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"bytesrecv": n, (numeric) total bytes received`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"conntime": n, (numeric) time the connection was made in seconds since 1 Jan 1970 GMT`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"pingtime": n, (numeric) number of microseconds the last ping took`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"pingwait": n, (numeric) number of microseconds a queued ping has been waiting for a response`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"version": n, (numeric) the protocol version of the peer`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"subver": "useragent", (string) the user agent of the peer`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"inbound": true_or_false, (boolean) whether or not the peer is an inbound connection`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"startingheight": n, (numeric) the latest block height the peer knew about when the connection was established`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"syncnode": true_or_false, (boolean) whether or not the peer is the sync peer`<br />&nbsp;&nbsp;`}, ...`<br />`]`|
|Example Return|`[`<br />&nbsp;&nbsp;`{`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"addr": "178.172.xxx.xxx:8333",`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"services": "00000001",`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"lastrecv": 1388183523,`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"lastsend": 1388185470,`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"bytessent": 287592965,`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"bytesrecv": 780340,`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"conntime": 1388182973,`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"pingtime": 405551,`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"pingwait": 183023,`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"version": 70001,`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"subver": "/btcd:0.4.0/",`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"inbound": false,`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"startingheight": 276921,`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"syncnode": true,`<br />&nbsp;&nbsp;`}`<br />`]`|
|Returns|`[`<br />&nbsp;&nbsp;`{`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"addr": "host:port", (string) the ip address and port of the peer`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"services": "00000001", (string) the services supported by the peer`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"lastrecv": n, (numeric) time the last message was received in seconds since 1 Jan 1970 GMT`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"lastsend": n, (numeric) time the last message was sent in seconds since 1 Jan 1970 GMT`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"bytessent": n, (numeric) total bytes sent`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"bytesrecv": n, (numeric) total bytes received`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"conntime": n, (numeric) time the connection was made in seconds since 1 Jan 1970 GMT`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"pingtime": n, (numeric) number of microseconds the last ping took`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"pingwait": n, (numeric) number of microseconds a queued ping has been waiting for a response`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"version": n, (numeric) the protocol version of the peer`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"subver": "useragent", (string) the user agent of the peer`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"inbound": true_or_false, (boolean) whether or not the peer is an inbound connection`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"startingheight": n, (numeric) the latest block height the peer knew about when the connection was established`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"currentheight": n, (numeric) the latest block height the peer is known to have relayed since connected`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"syncnode": true_or_false, (boolean) whether or not the peer is the sync peer`<br />&nbsp;&nbsp;`}, ...`<br />`]`|
|Example Return|`[`<br />&nbsp;&nbsp;`{`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"addr": "178.172.xxx.xxx:8333",`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"services": "00000001",`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"lastrecv": 1388183523,`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"lastsend": 1388185470,`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"bytessent": 287592965,`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"bytesrecv": 780340,`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"conntime": 1388182973,`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"pingtime": 405551,`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"pingwait": 183023,`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"version": 70001,`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"subver": "/btcd:0.4.0/",`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"inbound": false,`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"startingheight": 276921,`<br />&nbsp;&nbsp;&nbsp;&nbsp;`"currentheight": 276955,`<br/>&nbsp;&nbsp;&nbsp;&nbsp;`"syncnode": true,`<br />&nbsp;&nbsp;`}`<br />`]`|
[Return to Overview](#MethodOverview)<br />
***

24
peer.go
View file

@ -191,7 +191,9 @@ type peer struct {
bytesReceived uint64
bytesSent uint64
userAgent string
startingHeight int32
lastBlock int32
lastAnnouncedBlock *wire.ShaHash
lastPingNonce uint64 // Set to nonce if we have a pending ping.
lastPingTime time.Time // Time we sent last ping.
lastPingMicros int64 // Time for last ping to return.
@ -215,6 +217,27 @@ func (p *peer) isKnownInventory(invVect *wire.InvVect) bool {
return false
}
// UpdateLastBlockHeight updates the last known block for the peer. It is safe
// for concurrent access.
func (p *peer) UpdateLastBlockHeight(newHeight int32) {
p.StatsMtx.Lock()
defer p.StatsMtx.Unlock()
peerLog.Tracef("Updating last block height of peer %v from %v to %v",
p.addr, p.lastBlock, newHeight)
p.lastBlock = int32(newHeight)
}
// UpdateLastAnnouncedBlock updates meta-data about the last block sha this
// peer is known to have announced. It is safe for concurrent access.
func (p *peer) UpdateLastAnnouncedBlock(blkSha *wire.ShaHash) {
p.StatsMtx.Lock()
defer p.StatsMtx.Unlock()
peerLog.Tracef("Updating last blk for peer %v, %v", p.addr, blkSha)
p.lastAnnouncedBlock = blkSha
}
// AddKnownInventory adds the passed inventory to the cache of known inventory
// for the peer. It is safe for concurrent access.
func (p *peer) AddKnownInventory(invVect *wire.InvVect) {
@ -400,6 +423,7 @@ func (p *peer) handleVersionMsg(msg *wire.MsgVersion) {
peerLog.Debugf("Negotiated protocol version %d for peer %s",
p.protocolVersion, p)
p.lastBlock = msg.LastBlock
p.startingHeight = msg.LastBlock
// Set the supported services for the peer to what the remote peer
// advertised.

View file

@ -5,6 +5,7 @@
package main
import (
"bytes"
"container/list"
"crypto/rand"
"encoding/binary"
@ -70,6 +71,18 @@ type relayMsg struct {
data interface{}
}
// updatePeerHeightsMsg is a message sent from the blockmanager to the server
// after a new block has been accepted. The purpose of the message is to update
// the heights of peers that were known to announce the block before we
// connected it to the main chain or recognized it as an orphan. With these
// updates, peer heights will be kept up to date, allowing for fresh data when
// selecting sync peer candidacy.
type updatePeerHeightsMsg struct {
newSha *wire.ShaHash
newHeight int32
originPeer *peer
}
// server provides a bitcoin server for handling communications to and from
// bitcoin peers.
type server struct {
@ -96,6 +109,7 @@ type server struct {
query chan interface{}
relayInv chan relayMsg
broadcast chan broadcastMsg
peerHeightsUpdate chan updatePeerHeightsMsg
wg sync.WaitGroup
quit chan struct{}
nat NAT
@ -185,6 +199,35 @@ func (p *peerState) forAllPeers(closure func(p *peer)) {
p.forAllOutboundPeers(closure)
}
// handleUpdatePeerHeight updates the heights of all peers who were known to
// announce a block we recently accepted.
func (s *server) handleUpdatePeerHeights(state *peerState, umsg updatePeerHeightsMsg) {
state.forAllPeers(func(p *peer) {
// The origin peer should already have the updated height.
if p == umsg.originPeer {
return
}
// Skip this peer if it hasn't recently announced any new blocks.
p.StatsMtx.Lock()
if p.lastAnnouncedBlock == nil {
p.StatsMtx.Unlock()
return
}
latestBlkSha := p.lastAnnouncedBlock.Bytes()
p.StatsMtx.Unlock()
// If the peer has recently announced a block, and this block
// matches our newly accepted block, then update their block
// height.
if bytes.Equal(latestBlkSha, umsg.newSha.Bytes()) {
p.UpdateLastBlockHeight(umsg.newHeight)
p.UpdateLastAnnouncedBlock(nil)
}
})
}
// handleAddPeerMsg deals with adding new peers. It is invoked from the
// peerHandler goroutine.
func (s *server) handleAddPeerMsg(state *peerState, p *peer) bool {
@ -414,7 +457,8 @@ func (s *server) handleQuery(querymsg interface{}, state *peerState) {
Version: p.protocolVersion,
SubVer: p.userAgent,
Inbound: p.inbound,
StartingHeight: p.lastBlock,
StartingHeight: p.startingHeight,
CurrentHeight: p.lastBlock,
BanScore: 0,
SyncNode: p == syncPeer,
}
@ -601,6 +645,10 @@ out:
case p := <-s.donePeers:
s.handleDonePeerMsg(state, p)
// Block accepted in mainchain or orphan, update peer height.
case umsg := <-s.peerHeightsUpdate:
s.handleUpdatePeerHeights(state, umsg)
// Peer to ban.
case p := <-s.banPeers:
s.handleBanPeerMsg(state, p)
@ -818,6 +866,18 @@ func (s *server) NetTotals() (uint64, uint64) {
return s.bytesReceived, s.bytesSent
}
// UpdatePeerHeights updates the heights of all peers who have have announced
// the latest connected main chain block, or a recognized orphan. These height
// updates allow us to dynamically refresh peer heights, ensuring sync peer
// selection has access to the latest block heights for each peer.
func (s *server) UpdatePeerHeights(latestBlkSha *wire.ShaHash, latestHeight int32, updateSource *peer) {
s.peerHeightsUpdate <- updatePeerHeightsMsg{
newSha: latestBlkSha,
newHeight: latestHeight,
originPeer: updateSource,
}
}
// rebroadcastHandler keeps track of user submitted inventories that we have
// sent out but have not yet made it into a block. We periodically rebroadcast
// them in case our peers restarted or otherwise lost track of them.
@ -1246,6 +1306,7 @@ func newServer(listenAddrs []string, db database.Db, chainParams *chaincfg.Param
broadcast: make(chan broadcastMsg, cfg.MaxPeers),
quit: make(chan struct{}),
modifyRebroadcastInv: make(chan interface{}),
peerHeightsUpdate: make(chan updatePeerHeightsMsg),
nat: nat,
db: db,
timeSource: blockchain.NewMedianTime(),