server: Handler for getcfcheckpt messages.
This commit is contained in:
parent
0581e18840
commit
4d0e856ea1
1 changed files with 113 additions and 0 deletions
113
server.go
113
server.go
|
@ -186,6 +186,13 @@ func (ps *peerState) forAllPeers(closure func(sp *serverPeer)) {
|
||||||
ps.forAllOutboundPeers(closure)
|
ps.forAllOutboundPeers(closure)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cfHeaderKV is a tuple of a filter header and its associated block hash. The
|
||||||
|
// struct is used to cache cfcheckpt responses.
|
||||||
|
type cfHeaderKV struct {
|
||||||
|
blockHash chainhash.Hash
|
||||||
|
filterHeader chainhash.Hash
|
||||||
|
}
|
||||||
|
|
||||||
// server provides a bitcoin server for handling communications to and from
|
// server provides a bitcoin server for handling communications to and from
|
||||||
// bitcoin peers.
|
// bitcoin peers.
|
||||||
type server struct {
|
type server struct {
|
||||||
|
@ -234,6 +241,11 @@ type server struct {
|
||||||
// The fee estimator keeps track of how long transactions are left in
|
// The fee estimator keeps track of how long transactions are left in
|
||||||
// the mempool before they are mined into blocks.
|
// the mempool before they are mined into blocks.
|
||||||
feeEstimator *mempool.FeeEstimator
|
feeEstimator *mempool.FeeEstimator
|
||||||
|
|
||||||
|
// cfCheckptCaches stores a cached slice of filter headers for cfcheckpt
|
||||||
|
// messages for each filter type.
|
||||||
|
cfCheckptCaches map[wire.FilterType][]cfHeaderKV
|
||||||
|
cfCheckptCachesMtx sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// serverPeer extends the peer to maintain state shared by the server and
|
// serverPeer extends the peer to maintain state shared by the server and
|
||||||
|
@ -883,6 +895,105 @@ func (sp *serverPeer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) {
|
||||||
sp.QueueMessage(headersMsg, nil)
|
sp.QueueMessage(headersMsg, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OnGetCFCheckpt is invoked when a peer receives a getcfcheckpt bitcoin message.
|
||||||
|
func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
|
||||||
|
// Ignore getcfcheckpt requests if not in sync.
|
||||||
|
if !sp.server.syncManager.IsCurrent() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
blockHashes, err := sp.server.chain.IntervalBlockHashes(&msg.StopHash,
|
||||||
|
wire.CFCheckptInterval)
|
||||||
|
if err != nil {
|
||||||
|
peerLog.Debugf("Invalid getcfilters request: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var updateCache bool
|
||||||
|
var checkptCache []cfHeaderKV
|
||||||
|
|
||||||
|
if len(blockHashes) > len(checkptCache) {
|
||||||
|
// Update the cache if the checkpoint chain is longer than the cached
|
||||||
|
// one. This ensures that the cache is relatively stable and mostly
|
||||||
|
// overlaps with the best chain, since it follows the longest chain
|
||||||
|
// heuristic.
|
||||||
|
updateCache = true
|
||||||
|
|
||||||
|
// Take write lock because we are going to update cache.
|
||||||
|
sp.server.cfCheckptCachesMtx.Lock()
|
||||||
|
defer sp.server.cfCheckptCachesMtx.Unlock()
|
||||||
|
|
||||||
|
// Grow the checkptCache to be the length of blockHashes.
|
||||||
|
additionalLength := len(blockHashes) - len(checkptCache)
|
||||||
|
checkptCache = append(sp.server.cfCheckptCaches[msg.FilterType],
|
||||||
|
make([]cfHeaderKV, additionalLength)...)
|
||||||
|
} else {
|
||||||
|
updateCache = false
|
||||||
|
|
||||||
|
// Take reader lock because we are not going to update cache.
|
||||||
|
sp.server.cfCheckptCachesMtx.RLock()
|
||||||
|
defer sp.server.cfCheckptCachesMtx.RUnlock()
|
||||||
|
|
||||||
|
checkptCache = sp.server.cfCheckptCaches[msg.FilterType]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterate backwards until the block hash is found in the cache.
|
||||||
|
var forkIdx int
|
||||||
|
for forkIdx = len(checkptCache); forkIdx > 0; forkIdx-- {
|
||||||
|
if checkptCache[forkIdx-1].blockHash == blockHashes[forkIdx-1] {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Populate results with cached checkpoints.
|
||||||
|
checkptMsg := wire.NewMsgCFCheckpt(msg.FilterType, &msg.StopHash,
|
||||||
|
len(blockHashes))
|
||||||
|
for i := 0; i < forkIdx; i++ {
|
||||||
|
checkptMsg.AddCFHeader(&checkptCache[i].filterHeader)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Look up any filter headers that aren't cached.
|
||||||
|
blockHashPtrs := make([]*chainhash.Hash, 0, len(blockHashes)-forkIdx)
|
||||||
|
for i := forkIdx; i < len(blockHashes); i++ {
|
||||||
|
blockHashPtrs = append(blockHashPtrs, &blockHashes[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
filterHeaders, err := sp.server.cfIndex.FilterHeadersByBlockHashes(blockHashPtrs,
|
||||||
|
msg.FilterType)
|
||||||
|
if err != nil {
|
||||||
|
peerLog.Errorf("Error retrieving cfilter headers: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, filterHeaderBytes := range filterHeaders {
|
||||||
|
if len(filterHeaderBytes) == 0 {
|
||||||
|
peerLog.Warnf("Could not obtain CF header for %v", blockHashPtrs[i])
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
filterHeader, err := chainhash.NewHash(filterHeaderBytes)
|
||||||
|
if err != nil {
|
||||||
|
peerLog.Warnf("Committed filter header deserialize "+
|
||||||
|
"failed: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
checkptMsg.AddCFHeader(filterHeader)
|
||||||
|
if updateCache {
|
||||||
|
checkptCache[forkIdx+i] = cfHeaderKV{
|
||||||
|
blockHash: blockHashes[forkIdx+i],
|
||||||
|
filterHeader: *filterHeader,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if updateCache {
|
||||||
|
sp.server.cfCheckptCaches[msg.FilterType] = checkptCache
|
||||||
|
}
|
||||||
|
|
||||||
|
sp.QueueMessage(checkptMsg, nil)
|
||||||
|
}
|
||||||
|
|
||||||
// enforceNodeBloomFlag disconnects the peer if the server is not configured to
|
// enforceNodeBloomFlag disconnects the peer if the server is not configured to
|
||||||
// allow bloom filters. Additionally, if the peer has negotiated to a protocol
|
// allow bloom filters. Additionally, if the peer has negotiated to a protocol
|
||||||
// version that is high enough to observe the bloom filter service support bit,
|
// version that is high enough to observe the bloom filter service support bit,
|
||||||
|
@ -1732,6 +1843,7 @@ func newPeerConfig(sp *serverPeer) *peer.Config {
|
||||||
OnGetHeaders: sp.OnGetHeaders,
|
OnGetHeaders: sp.OnGetHeaders,
|
||||||
OnGetCFilters: sp.OnGetCFilters,
|
OnGetCFilters: sp.OnGetCFilters,
|
||||||
OnGetCFHeaders: sp.OnGetCFHeaders,
|
OnGetCFHeaders: sp.OnGetCFHeaders,
|
||||||
|
OnGetCFCheckpt: sp.OnGetCFCheckpt,
|
||||||
OnFeeFilter: sp.OnFeeFilter,
|
OnFeeFilter: sp.OnFeeFilter,
|
||||||
OnFilterAdd: sp.OnFilterAdd,
|
OnFilterAdd: sp.OnFilterAdd,
|
||||||
OnFilterClear: sp.OnFilterClear,
|
OnFilterClear: sp.OnFilterClear,
|
||||||
|
@ -2351,6 +2463,7 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
|
||||||
services: services,
|
services: services,
|
||||||
sigCache: txscript.NewSigCache(cfg.SigCacheMaxSize),
|
sigCache: txscript.NewSigCache(cfg.SigCacheMaxSize),
|
||||||
hashCache: txscript.NewHashCache(cfg.SigCacheMaxSize),
|
hashCache: txscript.NewHashCache(cfg.SigCacheMaxSize),
|
||||||
|
cfCheckptCaches: make(map[wire.FilterType][]cfHeaderKV),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the transaction and address indexes if needed.
|
// Create the transaction and address indexes if needed.
|
||||||
|
|
Loading…
Reference in a new issue