server: fix bug in cf checkpoint serving

In this commit, we fix a bug in the way that we previously attempted to
server cfcheckpoints. In the prior version we would never actually
fetch the current length of the cache. As a result, after the first time
the checkpoints were fetched, we would always continually grow the
cache rather than using what's there if sufficient.

In this commit, we fix this behavior by always checking the length, then
either keeping the rite lock, or downgrading to a read lock if the size
was sufficient.
This commit is contained in:
Olaoluwa Osuntokun 2018-07-14 20:50:58 -07:00
parent f7366fb51b
commit 5e86c37411
No known key found for this signature in database
GPG key ID: 964EA263DD637C21

View file

@ -947,6 +947,9 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
return return
} }
// Now that we know the client is fetching a filter that we know of,
// we'll fetch the block hashes et each check point interval so we can
// compare against our cache, and create new check points if necessary.
blockHashes, err := sp.server.chain.IntervalBlockHashes( blockHashes, err := sp.server.chain.IntervalBlockHashes(
&msg.StopHash, wire.CFCheckptInterval, &msg.StopHash, wire.CFCheckptInterval,
) )
@ -955,43 +958,55 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
return return
} }
var ( checkptMsg := wire.NewMsgCFCheckpt(
updateCache bool msg.FilterType, &msg.StopHash, len(blockHashes),
checkptCache []cfHeaderKV
) )
// If the set of check points requested goes back further than what // Fetch the current existing cache so we can decide if we need to
// we've already generated in our cache, then we'll need to update it. // extend it or if its adequate as is.
if len(blockHashes) > len(checkptCache) {
// Update the cache if the checkpoint chain is longer than the
// cached one. This ensures that the cache is relatively stable
// and mostly overlaps with the best chain, since it follows
// the longest chain heuristic.
updateCache = true
// Take write lock because we are going to update cache.
sp.server.cfCheckptCachesMtx.Lock() sp.server.cfCheckptCachesMtx.Lock()
checkptCache := sp.server.cfCheckptCaches[msg.FilterType]
// If the set of block hashes is beyond the current size of the cache,
// then we'll expand the size of the cache and also retain the write
// lock.
var updateCache bool
if len(blockHashes) > len(checkptCache) {
// Now that we know we'll need to modify the size of the cache,
// we'll defer the release of the write lock so we don't
// forget.
defer sp.server.cfCheckptCachesMtx.Unlock() defer sp.server.cfCheckptCachesMtx.Unlock()
// Grow the checkptCache to be the length of blockHashes. // We'll mark that we need to update the cache for below and
// also expand the size of the cache in place.
updateCache = true
additionalLength := len(blockHashes) - len(checkptCache) additionalLength := len(blockHashes) - len(checkptCache)
newEntries := make([]cfHeaderKV, additionalLength)
peerLog.Infof("Growing size of checkpoint cache from %v to %v "+
"block hashes", len(checkptCache), len(blockHashes))
checkptCache = append( checkptCache = append(
sp.server.cfCheckptCaches[msg.FilterType], sp.server.cfCheckptCaches[msg.FilterType],
make([]cfHeaderKV, additionalLength)..., newEntries...,
) )
} else { } else {
// Otherwise, we don't need to update the cache as we already // Otherwise, we'll release the write lock, then grab the read
// have enough headers pre-generated. // lock, as the cache is already properly sized.
updateCache = false sp.server.cfCheckptCachesMtx.Unlock()
// Take reader lock because we are not going to update cache.
sp.server.cfCheckptCachesMtx.RLock() sp.server.cfCheckptCachesMtx.RLock()
defer sp.server.cfCheckptCachesMtx.RUnlock()
checkptCache = sp.server.cfCheckptCaches[msg.FilterType] peerLog.Tracef("Serving stale cache of size %v",
len(checkptCache))
defer sp.server.cfCheckptCachesMtx.RUnlock()
} }
// Iterate backwards until the block hash is found in the cache. // Now that we know the cache is of an appropriate size, we'll iterate
// backwards until the find the block hash. We do this as it's possible
// a re-org has occurred so items in the db are now in the main china
// while the cache has been partially invalidated.
var forkIdx int var forkIdx int
for forkIdx = len(checkptCache); forkIdx > 0; forkIdx-- { for forkIdx = len(checkptCache); forkIdx > 0; forkIdx-- {
if checkptCache[forkIdx-1].blockHash == blockHashes[forkIdx-1] { if checkptCache[forkIdx-1].blockHash == blockHashes[forkIdx-1] {
@ -999,20 +1014,19 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
} }
} }
// Populate results with cached checkpoints. // Now that we know the how much of the cache is relevant for this
checkptMsg := wire.NewMsgCFCheckpt( // query, we'll populate our check point message with the cache as is.
msg.FilterType, &msg.StopHash, len(blockHashes), // Shortly below, we'll populate the new elements of the cache.
)
for i := 0; i < forkIdx; i++ { for i := 0; i < forkIdx; i++ {
checkptMsg.AddCFHeader(&checkptCache[i].filterHeader) checkptMsg.AddCFHeader(&checkptCache[i].filterHeader)
} }
// Look up any filter headers that aren't cached. // We'll now collect the set of hashes that are beyond our cache so we
// can look up the filter headers to populate the final cache.
blockHashPtrs := make([]*chainhash.Hash, 0, len(blockHashes)-forkIdx) blockHashPtrs := make([]*chainhash.Hash, 0, len(blockHashes)-forkIdx)
for i := forkIdx; i < len(blockHashes); i++ { for i := forkIdx; i < len(blockHashes); i++ {
blockHashPtrs = append(blockHashPtrs, &blockHashes[i]) blockHashPtrs = append(blockHashPtrs, &blockHashes[i])
} }
filterHeaders, err := sp.server.cfIndex.FilterHeadersByBlockHashes( filterHeaders, err := sp.server.cfIndex.FilterHeadersByBlockHashes(
blockHashPtrs, msg.FilterType, blockHashPtrs, msg.FilterType,
) )
@ -1021,6 +1035,8 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
return return
} }
// Now that we have the full set of filter headers, we'll add them to
// the checkpoint message, and also update our cache in line.
for i, filterHeaderBytes := range filterHeaders { for i, filterHeaderBytes := range filterHeaders {
if len(filterHeaderBytes) == 0 { if len(filterHeaderBytes) == 0 {
peerLog.Warnf("Could not obtain CF header for %v", peerLog.Warnf("Could not obtain CF header for %v",
@ -1036,6 +1052,9 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
} }
checkptMsg.AddCFHeader(filterHeader) checkptMsg.AddCFHeader(filterHeader)
// If the new main chain is longer than what's in the cache,
// then we'll override it beyond the fork point.
if updateCache { if updateCache {
checkptCache[forkIdx+i] = cfHeaderKV{ checkptCache[forkIdx+i] = cfHeaderKV{
blockHash: blockHashes[forkIdx+i], blockHash: blockHashes[forkIdx+i],
@ -1044,6 +1063,8 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
} }
} }
// Finally, we'll update the cache if we need to, and send the final
// message back to the requesting peer.
if updateCache { if updateCache {
sp.server.cfCheckptCaches[msg.FilterType] = checkptCache sp.server.cfCheckptCaches[msg.FilterType] = checkptCache
} }