Fix some issues with cfheaders sync; reorgs still don't work.

This commit is contained in:
Alex 2017-04-13 20:38:38 -06:00 committed by Olaoluwa Osuntokun
parent 87759a142f
commit be44a3ea88
2 changed files with 111 additions and 44 deletions

View file

@ -147,9 +147,12 @@ type blockManager struct {
requestedBlocks map[chainhash.Hash]struct{} requestedBlocks map[chainhash.Hash]struct{}
progressLogger *blockProgressLogger progressLogger *blockProgressLogger
syncPeer *serverPeer syncPeer *serverPeer
msgChan chan interface{} // Channel for messages that come from peers
wg sync.WaitGroup peerChan chan interface{}
quit chan struct{} // Channel for messages that come from internal commands
intChan chan interface{}
wg sync.WaitGroup
quit chan struct{}
headerList *list.List headerList *list.List
reorgList *list.List reorgList *list.List
@ -157,9 +160,10 @@ type blockManager struct {
nextCheckpoint *chaincfg.Checkpoint nextCheckpoint *chaincfg.Checkpoint
lastRequested chainhash.Hash lastRequested chainhash.Hash
basicHeaders map[chainhash.Hash]map[chainhash.Hash][]*serverPeer basicHeaders map[chainhash.Hash]map[chainhash.Hash][]*serverPeer
extendedHeaders map[chainhash.Hash]map[chainhash.Hash][]*serverPeer lastBasicCFHeaderHeight int32
lastFilterHeight int32 extendedHeaders map[chainhash.Hash]map[chainhash.Hash][]*serverPeer
lastExtCFHeaderHeight int32
minRetargetTimespan int64 // target timespan / adjustment factor minRetargetTimespan int64 // target timespan / adjustment factor
maxRetargetTimespan int64 // target timespan * adjustment factor maxRetargetTimespan int64 // target timespan * adjustment factor
@ -177,7 +181,8 @@ func newBlockManager(s *ChainService) (*blockManager, error) {
server: s, server: s,
requestedBlocks: make(map[chainhash.Hash]struct{}), requestedBlocks: make(map[chainhash.Hash]struct{}),
progressLogger: newBlockProgressLogger("Processed", log), progressLogger: newBlockProgressLogger("Processed", log),
msgChan: make(chan interface{}, MaxPeers*3), peerChan: make(chan interface{}, MaxPeers*3),
intChan: make(chan interface{}, 1),
headerList: list.New(), headerList: list.New(),
reorgList: list.New(), reorgList: list.New(),
quit: make(chan struct{}), quit: make(chan struct{}),
@ -236,7 +241,7 @@ func (b *blockManager) NewPeer(sp *serverPeer) {
if atomic.LoadInt32(&b.shutdown) != 0 { if atomic.LoadInt32(&b.shutdown) != 0 {
return return
} }
b.msgChan <- &newPeerMsg{peer: sp} b.peerChan <- &newPeerMsg{peer: sp}
} }
// handleNewPeerMsg deals with new peers that have signalled they may // handleNewPeerMsg deals with new peers that have signalled they may
@ -269,7 +274,7 @@ func (b *blockManager) DonePeer(sp *serverPeer) {
return return
} }
b.msgChan <- &donePeerMsg{peer: sp} b.peerChan <- &donePeerMsg{peer: sp}
} }
// handleDonePeerMsg deals with peers that have signalled they are done. It // handleDonePeerMsg deals with peers that have signalled they are done. It
@ -318,8 +323,23 @@ func (b *blockManager) blockHandler() {
candidatePeers := list.New() candidatePeers := list.New()
out: out:
for { for {
// Check internal messages channel first and continue if there's
// nothing to process.
select { select {
case m := <-b.msgChan: case m := <-b.intChan:
switch msg := m.(type) {
case *processCFHeadersMsg:
b.handleProcessCFHeadersMsg(msg)
default:
log.Warnf("Invalid message type in block "+
"handler: %T", msg)
}
default:
}
// Now check peer messages and quit channels.
select {
case m := <-b.peerChan:
switch msg := m.(type) { switch msg := m.(type) {
case *newPeerMsg: case *newPeerMsg:
b.handleNewPeerMsg(candidatePeers, msg.peer) b.handleNewPeerMsg(candidatePeers, msg.peer)
@ -343,9 +363,6 @@ out:
case *donePeerMsg: case *donePeerMsg:
b.handleDonePeerMsg(candidatePeers, msg.peer) b.handleDonePeerMsg(candidatePeers, msg.peer)
case *processCFHeadersMsg:
b.handleProcessCFHeadersMsg(msg)
case getSyncPeerMsg: case getSyncPeerMsg:
msg.reply <- b.syncPeer msg.reply <- b.syncPeer
@ -381,6 +398,9 @@ out:
log.Trace("Block handler done") log.Trace("Block handler done")
} }
// queueHandler reads the message channel and queues the message. This allows
// lookahead checks in
// isSyncCandidate returns whether or not the peer is a candidate to consider // isSyncCandidate returns whether or not the peer is a candidate to consider
// syncing from. // syncing from.
func (b *blockManager) isSyncCandidate(sp *serverPeer) bool { func (b *blockManager) isSyncCandidate(sp *serverPeer) bool {
@ -597,7 +617,7 @@ func (b *blockManager) current() bool {
// the connected peers. // the connected peers.
func (b *blockManager) IsCurrent() bool { func (b *blockManager) IsCurrent() bool {
reply := make(chan bool) reply := make(chan bool)
b.msgChan <- isCurrentMsg{reply: reply} b.peerChan <- isCurrentMsg{reply: reply}
return <-reply return <-reply
} }
@ -609,7 +629,7 @@ func (b *blockManager) QueueInv(inv *wire.MsgInv, sp *serverPeer) {
return return
} }
b.msgChan <- &invMsg{inv: inv, peer: sp} b.peerChan <- &invMsg{inv: inv, peer: sp}
} }
// handleInvMsg handles inv messages from all peers. // handleInvMsg handles inv messages from all peers.
@ -701,7 +721,7 @@ func (b *blockManager) QueueHeaders(headers *wire.MsgHeaders, sp *serverPeer) {
return return
} }
b.msgChan <- &headersMsg{headers: headers, peer: sp} b.peerChan <- &headersMsg{headers: headers, peer: sp}
} }
// handleHeadersMsg handles headers messages from all peers. // handleHeadersMsg handles headers messages from all peers.
@ -1038,7 +1058,7 @@ func (b *blockManager) QueueCFHeaders(cfheaders *wire.MsgCFHeaders,
return return
} }
b.msgChan <- &cfheadersMsg{cfheaders: cfheaders, peer: sp} b.peerChan <- &cfheadersMsg{cfheaders: cfheaders, peer: sp}
} }
// handleCFHeadersMsg handles cfheaders messages from all peers. // handleCFHeadersMsg handles cfheaders messages from all peers.
@ -1098,13 +1118,14 @@ func (b *blockManager) handleCFHeadersMsg(cfhmsg *cfheadersMsg) {
) )
el = el.Prev() el = el.Prev()
} }
b.msgChan <- &processCFHeadersMsg{ b.intChan <- &processCFHeadersMsg{
earliestNode: node, earliestNode: node,
stopHash: req.stopHash, stopHash: req.stopHash,
extended: req.extended, extended: req.extended,
} }
log.Tracef("Processed cfheaders starting at %s, ending at %s", log.Tracef("Processed cfheaders starting at %s, ending at %s, from "+
node.header.BlockHash(), req.stopHash) "peer %s, extended: %t", node.header.BlockHash(), req.stopHash,
cfhmsg.peer.Addr(), req.extended)
} }
// handleProcessCFHeadersMsg checks to see if we have enough cfheaders to make // handleProcessCFHeadersMsg checks to see if we have enough cfheaders to make
@ -1115,23 +1136,30 @@ func (b *blockManager) handleProcessCFHeadersMsg(msg *processCFHeadersMsg) {
// Assume we aren't ready to make a decision about correct headers yet. // Assume we aren't ready to make a decision about correct headers yet.
ready := false ready := false
filterMap := b.basicHeaders
writeFunc := b.server.putBasicHeader
readFunc := b.server.GetBasicHeader
lastCFHeaderHeight := &b.lastBasicCFHeaderHeight
if msg.extended {
filterMap = b.extendedHeaders
writeFunc = b.server.putExtHeader
readFunc = b.server.GetExtHeader
lastCFHeaderHeight = &b.lastExtCFHeaderHeight
}
// If we have started receiving cfheaders messages for blocks farther // If we have started receiving cfheaders messages for blocks farther
// than the last set we haven't made a decision on, it's time to make // than the last set we haven't made a decision on, it's time to make
// a decision. // a decision.
if msg.earliestNode.height > b.lastFilterHeight { if msg.earliestNode.height > *lastCFHeaderHeight {
if b.lastFilterHeight != 0 { if *lastCFHeaderHeight != 0 {
ready = true ready = true
} }
b.lastFilterHeight = msg.earliestNode.height *lastCFHeaderHeight = msg.earliestNode.height
} }
// If there are no other messages left, we should go ahead and make a // If there are no other messages left, we should go ahead and make a
// decision because we have all the info we're going to get. // decision because we have all the info we're going to get.
// TODO: Instead of using just a channel to queue messages, create if len(b.peerChan) == 0 {
// another goroutine that reads the channel and appends the messages to
// a slice. Then we can check the slice for only cfheaders messages. We
// might need to add DoS protection for that.
if len(b.msgChan) == 0 {
ready = true ready = true
} }
@ -1146,14 +1174,6 @@ func (b *blockManager) handleProcessCFHeadersMsg(msg *processCFHeadersMsg) {
// find a conflict, we have to do additional checks; otherwise, we write // find a conflict, we have to do additional checks; otherwise, we write
// the filter header to the database. // the filter header to the database.
el := b.headerList.Front() el := b.headerList.Front()
filterMap := b.basicHeaders
writeFunc := b.server.putBasicHeader
readFunc := b.server.GetBasicHeader
if msg.extended {
filterMap = b.extendedHeaders
writeFunc = b.server.putExtHeader
readFunc = b.server.GetExtHeader
}
for el != nil { for el != nil {
node := el.Value.(*headerNode) node := el.Value.(*headerNode)
hash := node.header.BlockHash() hash := node.header.BlockHash()
@ -1193,7 +1213,8 @@ func (b *blockManager) handleProcessCFHeadersMsg(msg *processCFHeadersMsg) {
// If we've reached the end, we can return // If we've reached the end, we can return
if hash == msg.stopHash { if hash == msg.stopHash {
log.Tracef("Finished processing cfheaders messages up "+ log.Tracef("Finished processing cfheaders messages up "+
"to height %d/hash %s", node.height, hash) "to height %d/hash %s, extended: %t",
node.height, hash, msg.extended)
return return
} }
} }
@ -1208,7 +1229,7 @@ func (b *blockManager) QueueCFilter(cfilter *wire.MsgCFilter, sp *serverPeer) {
return return
} }
b.msgChan <- &cfilterMsg{cfilter: cfilter, peer: sp} b.peerChan <- &cfilterMsg{cfilter: cfilter, peer: sp}
} }
// handleCFilterMsg handles cfilter messages from all peers. // handleCFilterMsg handles cfilter messages from all peers.

View file

@ -189,11 +189,11 @@ func TestSetup(t *testing.T) {
// Generate 7 blocks on h1 and wait for ChainService to sync to the // Generate 7 blocks on h1 and wait for ChainService to sync to the
// newly-best chain on h1. // newly-best chain on h1.
h1.Node.Generate(7) /*h1.Node.Generate(7)
err = waitForSync(t, svc, h1, time.Second, 30*time.Second) err = waitForSync(t, svc, h1, time.Second, 30*time.Second)
if err != nil { if err != nil {
t.Fatalf("Couldn't sync ChainService: %s", err) t.Fatalf("Couldn't sync ChainService: %s", err)
} }*/
} }
// csd does a connect-sync-disconnect between nodes in order to support // csd does a connect-sync-disconnect between nodes in order to support
@ -234,12 +234,14 @@ func waitForSync(t *testing.T, svc *spvchain.ChainService,
var haveBest *waddrmgr.BlockStamp var haveBest *waddrmgr.BlockStamp
haveBest, err = svc.BestSnapshot() haveBest, err = svc.BestSnapshot()
if err != nil { if err != nil {
return err return fmt.Errorf("Couldn't get best snapshot from "+
"ChainService: %s", err)
} }
var total time.Duration var total time.Duration
for haveBest.Hash != *knownBestHash { for haveBest.Hash != *knownBestHash {
if total > timeout { if total > timeout {
return fmt.Errorf("Timed out after %v.", timeout) return fmt.Errorf("Timed out after %v waiting for "+
"header synchronization.", timeout)
} }
if haveBest.Height > knownBestHeight { if haveBest.Height > knownBestHeight {
return fmt.Errorf("Synchronized to the wrong chain.") return fmt.Errorf("Synchronized to the wrong chain.")
@ -256,9 +258,53 @@ func waitForSync(t *testing.T, svc *spvchain.ChainService,
haveBest.Hash) haveBest.Hash)
} }
} }
// Check if we're current // Check if we're current.
if !svc.IsCurrent() { if !svc.IsCurrent() {
return fmt.Errorf("ChainService doesn't see itself as current!") return fmt.Errorf("ChainService doesn't see itself as current!")
} }
return nil // Check if we have all of the cfheaders.
knownBasicHeader, err := correctSyncNode.Node.GetCFilterHeader(
knownBestHash, false)
if err != nil {
return fmt.Errorf("Couldn't get latest basic header from "+
"%s: %s", correctSyncNode.P2PAddress(), err)
}
knownExtHeader, err := correctSyncNode.Node.GetCFilterHeader(
knownBestHash, true)
if err != nil {
return fmt.Errorf("Couldn't get latest extended header from "+
"%s: %s", correctSyncNode.P2PAddress(), err)
}
for total <= timeout {
time.Sleep(checkInterval)
total += checkInterval
haveBasicHeader, err := svc.GetBasicHeader(*knownBestHash)
if err != nil {
t.Logf("Basic header unknown.")
continue
}
haveExtHeader, err := svc.GetExtHeader(*knownBestHash)
if err != nil {
t.Logf("Extended header unknown.")
continue
}
if *knownBasicHeader.HeaderHashes[0] != *haveBasicHeader {
return fmt.Errorf("Known basic header doesn't match "+
"the basic header the ChainService has. Known:"+
" %s, ChainService: %s",
knownBasicHeader.HeaderHashes[0],
haveBasicHeader)
}
if *knownExtHeader.HeaderHashes[0] != *haveExtHeader {
return fmt.Errorf("Known extended header doesn't "+
"match the extended header the ChainService "+
"has. Known: %s, ChainService: %s",
knownExtHeader.HeaderHashes[0], haveExtHeader)
}
t.Logf("Synced cfheaders to %d (%s)", haveBest.Height,
haveBest.Hash)
return nil
}
return fmt.Errorf("Timeout waiting for cfheaders synchronization after"+
" %v", timeout)
} }