Merge pull request #1277 from halseth/cf-checkpoint-rlock

server: allow for read concurrency when serving cf checkpoints, fix bug in checkpoint serving
This commit is contained in:
Olaoluwa Osuntokun 2018-09-03 16:29:27 -07:00 committed by GitHub
commit cff30e1d23
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -960,7 +960,7 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
// Fetch the current existing cache so we can decide if we need to // Fetch the current existing cache so we can decide if we need to
// extend it or if its adequate as is. // extend it or if its adequate as is.
sp.server.cfCheckptCachesMtx.Lock() sp.server.cfCheckptCachesMtx.RLock()
checkptCache := sp.server.cfCheckptCaches[msg.FilterType] checkptCache := sp.server.cfCheckptCaches[msg.FilterType]
// If the set of block hashes is beyond the current size of the cache, // If the set of block hashes is beyond the current size of the cache,
@ -969,12 +969,21 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
var updateCache bool var updateCache bool
if len(blockHashes) > len(checkptCache) { if len(blockHashes) > len(checkptCache) {
// Now that we know we'll need to modify the size of the cache, // Now that we know we'll need to modify the size of the cache,
// we'll defer the release of the write lock so we don't // we'll release the read lock and grab the write lock to
// forget. // possibly expand the cache size.
sp.server.cfCheckptCachesMtx.RUnlock()
sp.server.cfCheckptCachesMtx.Lock()
defer sp.server.cfCheckptCachesMtx.Unlock() defer sp.server.cfCheckptCachesMtx.Unlock()
// We'll mark that we need to update the cache for below and // Now that we have the write lock, we'll check again as it's
// also expand the size of the cache in place. // possible that the cache has already been expanded.
checkptCache = sp.server.cfCheckptCaches[msg.FilterType]
// If we still need to expand the cache, then We'll mark that
// we need to update the cache for below and also expand the
// size of the cache in place.
if len(blockHashes) > len(checkptCache) {
updateCache = true updateCache = true
additionalLength := len(blockHashes) - len(checkptCache) additionalLength := len(blockHashes) - len(checkptCache)
@ -987,16 +996,14 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
sp.server.cfCheckptCaches[msg.FilterType], sp.server.cfCheckptCaches[msg.FilterType],
newEntries..., newEntries...,
) )
}
} else { } else {
// Otherwise, we'll release the write lock, then grab the read // Otherwise, we'll hold onto the read lock for the remainder
// lock, as the cache is already properly sized. // of this method.
sp.server.cfCheckptCachesMtx.Unlock() defer sp.server.cfCheckptCachesMtx.RUnlock()
sp.server.cfCheckptCachesMtx.RLock()
peerLog.Tracef("Serving stale cache of size %v", peerLog.Tracef("Serving stale cache of size %v",
len(checkptCache)) len(checkptCache))
defer sp.server.cfCheckptCachesMtx.RUnlock()
} }
// Now that we know the cache is of an appropriate size, we'll iterate // Now that we know the cache is of an appropriate size, we'll iterate
@ -1004,7 +1011,7 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
// a re-org has occurred so items in the db are now in the main china // a re-org has occurred so items in the db are now in the main china
// while the cache has been partially invalidated. // while the cache has been partially invalidated.
var forkIdx int var forkIdx int
for forkIdx = len(checkptCache); forkIdx > 0; forkIdx-- { for forkIdx = len(blockHashes); forkIdx > 0; forkIdx-- {
if checkptCache[forkIdx-1].blockHash == blockHashes[forkIdx-1] { if checkptCache[forkIdx-1].blockHash == blockHashes[forkIdx-1] {
break break
} }