diff --git a/server.go b/server.go index b19cb454..f3e4c9f7 100644 --- a/server.go +++ b/server.go @@ -186,6 +186,13 @@ func (ps *peerState) forAllPeers(closure func(sp *serverPeer)) { ps.forAllOutboundPeers(closure) } +// cfHeaderKV is a tuple of a filter header and its associated block hash. The +// struct is used to cache cfcheckpt responses. +type cfHeaderKV struct { + blockHash chainhash.Hash + filterHeader chainhash.Hash +} + // server provides a bitcoin server for handling communications to and from // bitcoin peers. type server struct { @@ -234,6 +241,11 @@ type server struct { // The fee estimator keeps track of how long transactions are left in // the mempool before they are mined into blocks. feeEstimator *mempool.FeeEstimator + + // cfCheckptCaches stores a cached slice of filter headers for cfcheckpt + // messages for each filter type. + cfCheckptCaches map[wire.FilterType][]cfHeaderKV + cfCheckptCachesMtx sync.RWMutex } // serverPeer extends the peer to maintain state shared by the server and @@ -883,6 +895,105 @@ func (sp *serverPeer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) { sp.QueueMessage(headersMsg, nil) } +// OnGetCFCheckpt is invoked when a peer receives a getcfcheckpt bitcoin message. +func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) { + // Ignore getcfcheckpt requests if not in sync. + if !sp.server.syncManager.IsCurrent() { + return + } + + blockHashes, err := sp.server.chain.IntervalBlockHashes(&msg.StopHash, + wire.CFCheckptInterval) + if err != nil { + peerLog.Debugf("Invalid getcfilters request: %v", err) + return + } + + var updateCache bool + var checkptCache []cfHeaderKV + + if len(blockHashes) > len(checkptCache) { + // Update the cache if the checkpoint chain is longer than the cached + // one. This ensures that the cache is relatively stable and mostly + // overlaps with the best chain, since it follows the longest chain + // heuristic. + updateCache = true + + // Take write lock because we are going to update cache. + sp.server.cfCheckptCachesMtx.Lock() + defer sp.server.cfCheckptCachesMtx.Unlock() + + // Grow the checkptCache to be the length of blockHashes. + additionalLength := len(blockHashes) - len(checkptCache) + checkptCache = append(sp.server.cfCheckptCaches[msg.FilterType], + make([]cfHeaderKV, additionalLength)...) + } else { + updateCache = false + + // Take reader lock because we are not going to update cache. + sp.server.cfCheckptCachesMtx.RLock() + defer sp.server.cfCheckptCachesMtx.RUnlock() + + checkptCache = sp.server.cfCheckptCaches[msg.FilterType] + } + + // Iterate backwards until the block hash is found in the cache. + var forkIdx int + for forkIdx = len(checkptCache); forkIdx > 0; forkIdx-- { + if checkptCache[forkIdx-1].blockHash == blockHashes[forkIdx-1] { + break + } + } + + // Populate results with cached checkpoints. + checkptMsg := wire.NewMsgCFCheckpt(msg.FilterType, &msg.StopHash, + len(blockHashes)) + for i := 0; i < forkIdx; i++ { + checkptMsg.AddCFHeader(&checkptCache[i].filterHeader) + } + + // Look up any filter headers that aren't cached. + blockHashPtrs := make([]*chainhash.Hash, 0, len(blockHashes)-forkIdx) + for i := forkIdx; i < len(blockHashes); i++ { + blockHashPtrs = append(blockHashPtrs, &blockHashes[i]) + } + + filterHeaders, err := sp.server.cfIndex.FilterHeadersByBlockHashes(blockHashPtrs, + msg.FilterType) + if err != nil { + peerLog.Errorf("Error retrieving cfilter headers: %v", err) + return + } + + for i, filterHeaderBytes := range filterHeaders { + if len(filterHeaderBytes) == 0 { + peerLog.Warnf("Could not obtain CF header for %v", blockHashPtrs[i]) + return + } + + filterHeader, err := chainhash.NewHash(filterHeaderBytes) + if err != nil { + peerLog.Warnf("Committed filter header deserialize "+ + "failed: %v", err) + return + } + + checkptMsg.AddCFHeader(filterHeader) + if updateCache { + checkptCache[forkIdx+i] = cfHeaderKV{ + blockHash: blockHashes[forkIdx+i], + filterHeader: *filterHeader, + } + } + } + + if updateCache { + sp.server.cfCheckptCaches[msg.FilterType] = checkptCache + } + + sp.QueueMessage(checkptMsg, nil) +} + // enforceNodeBloomFlag disconnects the peer if the server is not configured to // allow bloom filters. Additionally, if the peer has negotiated to a protocol // version that is high enough to observe the bloom filter service support bit, @@ -1732,6 +1843,7 @@ func newPeerConfig(sp *serverPeer) *peer.Config { OnGetHeaders: sp.OnGetHeaders, OnGetCFilters: sp.OnGetCFilters, OnGetCFHeaders: sp.OnGetCFHeaders, + OnGetCFCheckpt: sp.OnGetCFCheckpt, OnFeeFilter: sp.OnFeeFilter, OnFilterAdd: sp.OnFilterAdd, OnFilterClear: sp.OnFilterClear, @@ -2351,6 +2463,7 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param services: services, sigCache: txscript.NewSigCache(cfg.SigCacheMaxSize), hashCache: txscript.NewHashCache(cfg.SigCacheMaxSize), + cfCheckptCaches: make(map[wire.FilterType][]cfHeaderKV), } // Create the transaction and address indexes if needed.