diff --git a/spvsvc/spvchain/blockmanager.go b/spvsvc/spvchain/blockmanager.go index 307f63f..e9612e2 100644 --- a/spvsvc/spvchain/blockmanager.go +++ b/spvsvc/spvchain/blockmanager.go @@ -162,8 +162,10 @@ type blockManager struct { basicHeaders map[chainhash.Hash]map[chainhash.Hash][]*serverPeer lastBasicCFHeaderHeight int32 + numBasicCFHeadersMsgs int32 extendedHeaders map[chainhash.Hash]map[chainhash.Hash][]*serverPeer lastExtCFHeaderHeight int32 + numExtCFHeadersMsgs int32 mapMutex sync.Mutex minRetargetTimespan int64 // target timespan / adjustment factor @@ -1067,6 +1069,13 @@ func (b *blockManager) QueueCFHeaders(cfheaders *wire.MsgCFHeaders, return } + // Track number of pending cfheaders messsages for both basic and + // extended filters. + pendingMsgs := &b.numBasicCFHeadersMsgs + if cfheaders.Extended { + pendingMsgs = &b.numExtCFHeadersMsgs + } + atomic.AddInt32(pendingMsgs, 1) b.peerChan <- &cfheadersMsg{cfheaders: cfheaders, peer: sp} } @@ -1079,7 +1088,14 @@ func (b *blockManager) handleCFHeadersMsg(cfhmsg *cfheadersMsg) { extended: cfhmsg.cfheaders.Extended, stopHash: cfhmsg.cfheaders.StopHash, } + headerMap := b.basicHeaders + pendingMsgs := &b.numBasicCFHeadersMsgs + if req.extended { + headerMap = b.extendedHeaders + pendingMsgs = &b.numExtCFHeadersMsgs + } defer delete(cfhmsg.peer.requestedCFHeaders, req) + defer atomic.AddInt32(pendingMsgs, -1) // Check that the count is correct. This works even when the map lookup // fails as it returns 0 in that case. headerList := cfhmsg.cfheaders.HeaderHashes @@ -1106,10 +1122,6 @@ func (b *blockManager) handleCFHeadersMsg(cfhmsg *cfheadersMsg) { return } // Cycle through the filter header hashes and process them. - filterMap := b.basicHeaders - if req.extended { - filterMap = b.extendedHeaders - } var node *headerNode var hash chainhash.Hash for i := respLen - 1; i >= 0 && el != nil; i-- { @@ -1118,13 +1130,13 @@ func (b *blockManager) handleCFHeadersMsg(cfhmsg *cfheadersMsg) { // the database. Either way, break processing. node = el.Value.(*headerNode) hash = node.header.BlockHash() - if _, ok := filterMap[hash]; !ok { + b.mapMutex.Lock() + if _, ok := headerMap[hash]; !ok { break } // Process this header and set up the next iteration. - b.mapMutex.Lock() - filterMap[hash][*headerList[i]] = append( - filterMap[hash][*headerList[i]], cfhmsg.peer, + headerMap[hash][*headerList[i]] = append( + headerMap[hash][*headerList[i]], cfhmsg.peer, ) b.mapMutex.Unlock() el = el.Prev() @@ -1147,30 +1159,30 @@ func (b *blockManager) handleProcessCFHeadersMsg(msg *processCFHeadersMsg) { // Assume we aren't ready to make a decision about correct headers yet. ready := false - filterMap := b.basicHeaders + headerMap := b.basicHeaders writeFunc := b.server.putBasicHeader readFunc := b.server.GetBasicHeader lastCFHeaderHeight := &b.lastBasicCFHeaderHeight + pendingMsgs := &b.numBasicCFHeadersMsgs if msg.extended { - filterMap = b.extendedHeaders + headerMap = b.extendedHeaders writeFunc = b.server.putExtHeader readFunc = b.server.GetExtHeader lastCFHeaderHeight = &b.lastExtCFHeaderHeight + pendingMsgs = &b.numExtCFHeadersMsgs } // If we have started receiving cfheaders messages for blocks farther // than the last set we haven't made a decision on, it's time to make // a decision. - if msg.earliestNode.height > *lastCFHeaderHeight { - if *lastCFHeaderHeight != 0 { - ready = true - } - *lastCFHeaderHeight = msg.earliestNode.height + if msg.earliestNode.height > *lastCFHeaderHeight+1 { + ready = true } - // If there are no other messages left, we should go ahead and make a - // decision because we have all the info we're going to get. - if len(b.peerChan) == 0 { + // If there are no other cfheaders messages left for this type (basic vs + // extended), we should go ahead and make a decision because we have all + // the info we're going to get. + if atomic.LoadInt32(pendingMsgs) == 0 { ready = true } @@ -1184,13 +1196,15 @@ func (b *blockManager) handleProcessCFHeadersMsg(msg *processCFHeadersMsg) { // iterate through all of those headers, looking for conflicts. If we // find a conflict, we have to do additional checks; otherwise, we write // the filter header to the database. + log.Tracef("Begin processing cfheaders messages starting at %d (%s)", + msg.earliestNode.height, msg.earliestNode.header.BlockHash()) el := b.headerList.Front() for el != nil { node := el.Value.(*headerNode) hash := node.header.BlockHash() - if node.height >= msg.earliestNode.height { + if node.height > *lastCFHeaderHeight { b.mapMutex.Lock() - blockMap := filterMap[hash] + blockMap := headerMap[hash] switch len(blockMap) { // This should only happen if the filter has already // been written to the database or if there's a reorg. @@ -1207,13 +1221,22 @@ func (b *blockManager) handleProcessCFHeadersMsg(msg *processCFHeadersMsg) { // bamboozle us (or ALL our peers are). case 1: // This will only cycle once - for filterHash := range blockMap { - writeFunc(hash, filterHash) + for headerHash := range blockMap { + writeFunc(hash, headerHash) + log.Tracef("Wrote header for block %d "+ + "with %d cfheaders messages, "+ + "extended: %t", node.height, + len(blockMap[headerHash]), + msg.extended) } + *lastCFHeaderHeight = node.height // This is when we have conflicting information from // multiple peers. // TODO: Handle this case. default: + log.Warnf("Got more than 1 possible filter "+ + "header for block %d (%s)", node.height, + node.header.BlockHash()) } b.mapMutex.Unlock() }