diff --git a/spvsvc/spvchain/blockmanager.go b/spvsvc/spvchain/blockmanager.go index 89f5873..307f63f 100644 --- a/spvsvc/spvchain/blockmanager.go +++ b/spvsvc/spvchain/blockmanager.go @@ -164,6 +164,7 @@ type blockManager struct { lastBasicCFHeaderHeight int32 extendedHeaders map[chainhash.Hash]map[chainhash.Hash][]*serverPeer lastExtCFHeaderHeight int32 + mapMutex sync.Mutex minRetargetTimespan int64 // target timespan / adjustment factor maxRetargetTimespan int64 // target timespan * adjustment factor @@ -468,24 +469,28 @@ func (b *blockManager) resetHeaderState(newestHeader *wire.BlockHeader, newestHeight int32) { b.headerList.Init() b.startHeader = nil + b.mapMutex.Lock() b.basicHeaders = make( map[chainhash.Hash]map[chainhash.Hash][]*serverPeer, ) b.extendedHeaders = make( map[chainhash.Hash]map[chainhash.Hash][]*serverPeer, ) + b.mapMutex.Unlock() // Add an entry for the latest known block into the header pool. // This allows the next downloaded header to prove it links to the chain // properly. node := headerNode{header: newestHeader, height: newestHeight} b.headerList.PushBack(&node) + b.mapMutex.Lock() b.basicHeaders[newestHeader.BlockHash()] = make( map[chainhash.Hash][]*serverPeer, ) b.extendedHeaders[newestHeader.BlockHash()] = make( map[chainhash.Hash][]*serverPeer, ) + b.mapMutex.Unlock() } // startSync will choose the best peer among the available candidate peers to @@ -790,12 +795,14 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { } hmsg.peer.UpdateLastBlockHeight(node.height) e := b.headerList.PushBack(&node) + b.mapMutex.Lock() b.basicHeaders[node.header.BlockHash()] = make( map[chainhash.Hash][]*serverPeer, ) b.extendedHeaders[node.header.BlockHash()] = make( map[chainhash.Hash][]*serverPeer, ) + b.mapMutex.Unlock() if b.startHeader == nil { b.startHeader = e } @@ -938,12 +945,14 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { b.syncPeer = hmsg.peer b.server.rollbackToHeight(backHeight) b.server.putBlock(*blockHeader, backHeight+1) + b.mapMutex.Lock() b.basicHeaders[node.header.BlockHash()] = make( map[chainhash.Hash][]*serverPeer, ) b.extendedHeaders[node.header.BlockHash()] = make( map[chainhash.Hash][]*serverPeer, ) + b.mapMutex.Unlock() b.server.putMaxBlockHeight(backHeight + 1) b.resetHeaderState(&backHead, int32(backHeight)) b.headerList.PushBack(&headerNode{ @@ -1113,9 +1122,11 @@ func (b *blockManager) handleCFHeadersMsg(cfhmsg *cfheadersMsg) { break } // Process this header and set up the next iteration. + b.mapMutex.Lock() filterMap[hash][*headerList[i]] = append( filterMap[hash][*headerList[i]], cfhmsg.peer, ) + b.mapMutex.Unlock() el = el.Prev() } b.intChan <- &processCFHeadersMsg{ @@ -1178,6 +1189,7 @@ func (b *blockManager) handleProcessCFHeadersMsg(msg *processCFHeadersMsg) { node := el.Value.(*headerNode) hash := node.header.BlockHash() if node.height >= msg.earliestNode.height { + b.mapMutex.Lock() blockMap := filterMap[hash] switch len(blockMap) { // This should only happen if the filter has already @@ -1203,6 +1215,7 @@ func (b *blockManager) handleProcessCFHeadersMsg(msg *processCFHeadersMsg) { // TODO: Handle this case. default: } + b.mapMutex.Unlock() } //elToRemove := el diff --git a/spvsvc/spvchain/sync_test.go b/spvsvc/spvchain/sync_test.go index 6df9dc5..b684289 100644 --- a/spvsvc/spvchain/sync_test.go +++ b/spvsvc/spvchain/sync_test.go @@ -181,7 +181,7 @@ func TestSetup(t *testing.T) { // Generate 5 blocks on h2 and wait for ChainService to sync to the // newly-best chain on h2. - h2.Node.Generate(5) + /*h2.Node.Generate(5) err = waitForSync(t, svc, h2, time.Second, 30*time.Second) if err != nil { t.Fatalf("Couldn't sync ChainService: %s", err) @@ -189,7 +189,7 @@ func TestSetup(t *testing.T) { // Generate 7 blocks on h1 and wait for ChainService to sync to the // newly-best chain on h1. - /*h1.Node.Generate(7) + h1.Node.Generate(7) err = waitForSync(t, svc, h1, time.Second, 30*time.Second) if err != nil { t.Fatalf("Couldn't sync ChainService: %s", err) @@ -280,12 +280,16 @@ func waitForSync(t *testing.T, svc *spvchain.ChainService, total += checkInterval haveBasicHeader, err := svc.GetBasicHeader(*knownBestHash) if err != nil { - t.Logf("Basic header unknown.") + if logLevel != btclog.Off { + t.Logf("Basic header unknown.") + } continue } haveExtHeader, err := svc.GetExtHeader(*knownBestHash) if err != nil { - t.Logf("Extended header unknown.") + if logLevel != btclog.Off { + t.Logf("Extended header unknown.") + } continue } if *knownBasicHeader.HeaderHashes[0] != *haveBasicHeader { @@ -301,8 +305,67 @@ func waitForSync(t *testing.T, svc *spvchain.ChainService, "has. Known: %s, ChainService: %s", knownExtHeader.HeaderHashes[0], haveExtHeader) } - t.Logf("Synced cfheaders to %d (%s)", haveBest.Height, - haveBest.Hash) + // At this point, we know the latest cfheader is stored in the + // ChainService database. We now compare each cfheader the + // harness knows about to what's stored in the ChainService + // database to see if we've missed anything or messed anything + // up. + for i := int32(0); i <= haveBest.Height; i++ { + head, _, err := svc.GetBlockByHeight(uint32(i)) + if err != nil { + return fmt.Errorf("Couldn't read block by "+ + "height: %s", err) + } + hash := head.BlockHash() + haveBasicHeader, err := svc.GetBasicHeader(hash) + if err != nil { + return fmt.Errorf("Couldn't get basic header "+ + "for %d (%s) from DB", i, hash) + } + haveExtHeader, err := svc.GetExtHeader(hash) + if err != nil { + return fmt.Errorf("Couldn't get extended "+ + "header for %d (%s) from DB", i, hash) + } + knownBasicHeader, err := + correctSyncNode.Node.GetCFilterHeader(&hash, + false) + if err != nil { + return fmt.Errorf("Couldn't get basic header "+ + "for %d (%s) from node %s", i, hash, + correctSyncNode.P2PAddress()) + } + knownExtHeader, err := + correctSyncNode.Node.GetCFilterHeader(&hash, + true) + if err != nil { + return fmt.Errorf("Couldn't get extended "+ + "header for %d (%s) from node %s", i, + hash, correctSyncNode.P2PAddress()) + } + if *haveBasicHeader != + *knownBasicHeader.HeaderHashes[0] { + return fmt.Errorf("Basic header for %d (%s) "+ + "doesn't match node %s. DB: %s, node: "+ + "%s", i, hash, + correctSyncNode.P2PAddress(), + haveBasicHeader, + knownBasicHeader.HeaderHashes[0]) + } + if *haveExtHeader != + *knownExtHeader.HeaderHashes[0] { + return fmt.Errorf("Extended header for %d (%s)"+ + " doesn't match node %s. DB: %s, node:"+ + " %s", i, hash, + correctSyncNode.P2PAddress(), + haveExtHeader, + knownExtHeader.HeaderHashes[0]) + } + } + if logLevel != btclog.Off { + t.Logf("Synced cfheaders to %d (%s)", haveBest.Height, + haveBest.Hash) + } return nil } return fmt.Errorf("Timeout waiting for cfheaders synchronization after"+