Optimize, prepare for eventual sendheaders use, add sanity check

This commit is contained in:
Alex 2017-03-28 16:06:46 -06:00 committed by Olaoluwa Osuntokun
parent 0a8de495cc
commit 4d82af7f8e
3 changed files with 108 additions and 30 deletions

View file

@ -131,6 +131,7 @@ type blockManager struct {
reorgList *list.List
startHeader *list.Element
nextCheckpoint *chaincfg.Checkpoint
lastRequested chainhash.Hash
minRetargetTimespan int64 // target timespan / adjustment factor
maxRetargetTimespan int64 // target timespan * adjustment factor
@ -604,19 +605,31 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
// If this is the sync peer or we're current, get the headers
// for the announced blocks and update the last announced block.
if lastBlock != -1 && (imsg.peer == b.syncPeer || b.current()) {
// Make a locator starting from the latest known header we've
// processed.
locator := make(blockchain.BlockLocator, 0,
wire.MaxBlockLocatorsPerMsg)
lastHash := b.headerList.Back().Value.(*headerNode).header.BlockHash()
locator = append(locator, &lastHash)
// Add locator from the database as backup.
knownLocator, err := b.server.LatestBlockLocator()
if err == nil {
locator = append(locator, knownLocator...)
// Only send getheaders if we don't already know about the last
// block hash being announced.
if lastHash != invVects[lastBlock].Hash &&
b.lastRequested != invVects[lastBlock].Hash {
// Make a locator starting from the latest known header
// we've processed.
locator := make(blockchain.BlockLocator, 0,
wire.MaxBlockLocatorsPerMsg)
locator = append(locator, &lastHash)
// Add locator from the database as backup.
knownLocator, err := b.server.LatestBlockLocator()
if err == nil {
locator = append(locator, knownLocator...)
}
// Get headers based on locator.
err = imsg.peer.PushGetHeadersMsg(locator,
&invVects[lastBlock].Hash)
if err != nil {
log.Warnf("Failed to send getheaders message "+
"to peer %s: %s", imsg.peer.Addr(), err)
return
}
b.lastRequested = invVects[lastBlock].Hash
}
// Get headers based on locator.
imsg.peer.PushGetHeadersMsg(locator, &invVects[lastBlock].Hash)
}
}
@ -716,6 +729,12 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
if hmsg.peer != b.syncPeer && !b.current() {
return
}
// Check if this is the last block we know of. This is
// a shortcut for sendheaders so that each redundant
// header doesn't cause a disk read.
if blockHash == prevHash {
continue
}
// Check if this block is known. If so, we continue to
// the next one.
_, _, err := b.server.GetBlockByHash(blockHash)
@ -764,7 +783,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
height: int32(backHeight),
})
totalWork := big.NewInt(0)
for _, reorgHeader := range msg.Headers[i:] {
for j, reorgHeader := range msg.Headers[i:] {
err = b.checkHeaderSanity(reorgHeader,
maxTimestamp, true)
if err != nil {
@ -776,6 +795,10 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
}
totalWork.Add(totalWork,
blockchain.CalcWork(reorgHeader.Bits))
b.reorgList.PushBack(&headerNode{
header: reorgHeader,
height: int32(backHeight+1) + int32(j),
})
}
log.Tracef("Sane reorg attempted. Total work from "+
"reorg chain: %v", totalWork)
@ -808,13 +831,18 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
log.Tracef("Total work from known chain: %v", knownWork)
// Compare the two work totals and reject the new chain
// if it doesn't have more work than the previously
// known chain.
if knownWork.Cmp(totalWork) >= 0 {
log.Warnf("Reorg attempt that does not have "+
"more work than known chain from peer "+
"%s -- disconnecting", hmsg.peer.Addr())
// known chain. Disconnect if it's actually less than
// the known chain.
switch knownWork.Cmp(totalWork) {
case 1:
log.Warnf("Reorg attempt that has less work "+
"than known chain from peer %s -- "+
"disconnecting", hmsg.peer.Addr())
hmsg.peer.Disconnect()
fallthrough
case 0:
return
default:
}
// At this point, we have a valid reorg, so we roll
// back the existing chain and add the new block header.
@ -884,18 +912,20 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
//return
}
// Request the next batch of headers starting from the latest known
// header and ending with the next checkpoint.
locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash})
nextHash := zeroHash
if b.nextCheckpoint != nil {
nextHash = *b.nextCheckpoint.Hash
}
err := hmsg.peer.PushGetHeadersMsg(locator, &nextHash)
if err != nil {
log.Warnf("Failed to send getheaders message to "+
"peer %s: %s", hmsg.peer.Addr(), err)
return
// If not current, request the next batch of headers starting from the
// latest known header and ending with the next checkpoint.
if !b.current() {
locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash})
nextHash := zeroHash
if b.nextCheckpoint != nil {
nextHash = *b.nextCheckpoint.Hash
}
err := hmsg.peer.PushGetHeadersMsg(locator, &nextHash)
if err != nil {
log.Warnf("Failed to send getheaders message to "+
"peer %s: %s", hmsg.peer.Addr(), err)
return
}
}
}

View file

@ -292,6 +292,36 @@ func LatestBlock(tx walletdb.Tx) (wire.BlockHeader, uint32, error) {
return header, height, nil
}
// CheckConnectivity cycles through all of the block headers, from last to
// first, and makes sure they all connect to each other.
func CheckConnectivity(tx walletdb.Tx) error {
header, height, err := LatestBlock(tx)
if err != nil {
return fmt.Errorf("Couldn't retrieve latest block: %s", err)
}
for height > 0 {
newheader, newheight, err := GetBlockByHash(tx,
header.PrevBlock)
if err != nil {
return fmt.Errorf("Couldn't retrieve block %s: %s",
header.PrevBlock, err)
}
if newheader.BlockHash() != header.PrevBlock {
return fmt.Errorf("Block %s doesn't match block %s's "+
"PrevBlock (%s)", newheader.BlockHash(),
header.BlockHash(), header.PrevBlock)
}
if newheight != height-1 {
return fmt.Errorf("Block %s doesn't have correct "+
"height: want %d, got %d",
newheader.BlockHash(), height-1, newheight)
}
header = newheader
height = newheight
}
return nil
}
// BlockLocatorFromHash returns a block locator based on the provided hash.
func BlockLocatorFromHash(tx walletdb.Tx, hash chainhash.Hash) blockchain.BlockLocator {
locator := make(blockchain.BlockLocator, 0, wire.MaxBlockLocatorsPerMsg)

View file

@ -212,6 +212,23 @@ func (sp *serverPeer) pushGetCFHeadersMsg(locator blockchain.BlockLocator,
return nil
}
// pushSendHeadersMsg sends a sendheaders message to the connected peer.
func (sp *serverPeer) pushSendHeadersMsg() error {
if sp.VersionKnown() {
if sp.ProtocolVersion() > wire.SendHeadersVersion {
sp.QueueMessage(wire.NewMsgSendHeaders(), nil)
}
}
return nil
}
// OnVerAck is invoked when a peer receives a verack bitcoin message and is used
// to send the "sendheaders" command to peers that are of a sufficienty new
// protocol version.
func (sp *serverPeer) OnVerAck(_ *peer.Peer, msg *wire.MsgVerAck) {
sp.pushSendHeadersMsg()
}
// OnVersion is invoked when a peer receives a version bitcoin message
// and is used to negotiate the protocol version details as well as kick start
// the communications.
@ -1057,7 +1074,8 @@ func (s *ChainService) AnnounceNewTransactions( /*newTxs []*mempool.TxDesc*/ ) {
func newPeerConfig(sp *serverPeer) *peer.Config {
return &peer.Config{
Listeners: peer.MessageListeners{
OnVersion: sp.OnVersion,
OnVersion: sp.OnVersion,
//OnVerAck: sp.OnVerAck, // Don't use sendheaders yet
OnBlock: sp.OnBlock,
OnInv: sp.OnInv,
OnHeaders: sp.OnHeaders,