server: modify locking in OnGetCFCheckpt to allow for read concurrency

In this commit, we modify the locking scheme when serving cf checkpoints
for peers to allow the server to serve multiple peers at the same time.
Before this commit, we would first grab the write lock, check to see for
expansion, then release the read lock. In this commit we reverse this
and instead will grab the read lock, and upgrade to the write lock if we
need to actually expand the size of the cache.
This commit is contained in:
Olaoluwa Osuntokun 2018-08-27 17:46:35 -07:00
parent 79e00513b1
commit c8e6363e22
No known key found for this signature in database
GPG key ID: 964EA263DD637C21

View file

@ -960,7 +960,7 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
// Fetch the current existing cache so we can decide if we need to // Fetch the current existing cache so we can decide if we need to
// extend it or if its adequate as is. // extend it or if its adequate as is.
sp.server.cfCheckptCachesMtx.Lock() sp.server.cfCheckptCachesMtx.RLock()
checkptCache := sp.server.cfCheckptCaches[msg.FilterType] checkptCache := sp.server.cfCheckptCaches[msg.FilterType]
// If the set of block hashes is beyond the current size of the cache, // If the set of block hashes is beyond the current size of the cache,
@ -969,34 +969,41 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
var updateCache bool var updateCache bool
if len(blockHashes) > len(checkptCache) { if len(blockHashes) > len(checkptCache) {
// Now that we know we'll need to modify the size of the cache, // Now that we know we'll need to modify the size of the cache,
// we'll defer the release of the write lock so we don't // we'll release the read lock and grab the write lock to
// forget. // possibly expand the cache size.
sp.server.cfCheckptCachesMtx.RUnlock()
sp.server.cfCheckptCachesMtx.Lock()
defer sp.server.cfCheckptCachesMtx.Unlock() defer sp.server.cfCheckptCachesMtx.Unlock()
// We'll mark that we need to update the cache for below and // Now that we have the write lock, we'll check again as it's
// also expand the size of the cache in place. // possible that the cache has already been expanded.
updateCache = true checkptCache = sp.server.cfCheckptCaches[msg.FilterType]
additionalLength := len(blockHashes) - len(checkptCache) // If we still need to expand the cache, then We'll mark that
newEntries := make([]cfHeaderKV, additionalLength) // we need to update the cache for below and also expand the
// size of the cache in place.
if len(blockHashes) > len(checkptCache) {
updateCache = true
peerLog.Infof("Growing size of checkpoint cache from %v to %v "+ additionalLength := len(blockHashes) - len(checkptCache)
"block hashes", len(checkptCache), len(blockHashes)) newEntries := make([]cfHeaderKV, additionalLength)
checkptCache = append( peerLog.Infof("Growing size of checkpoint cache from %v to %v "+
sp.server.cfCheckptCaches[msg.FilterType], "block hashes", len(checkptCache), len(blockHashes))
newEntries...,
) checkptCache = append(
sp.server.cfCheckptCaches[msg.FilterType],
newEntries...,
)
}
} else { } else {
// Otherwise, we'll release the write lock, then grab the read // Otherwise, we'll hold onto the read lock for the remainder
// lock, as the cache is already properly sized. // of this method.
sp.server.cfCheckptCachesMtx.Unlock() defer sp.server.cfCheckptCachesMtx.RUnlock()
sp.server.cfCheckptCachesMtx.RLock()
peerLog.Tracef("Serving stale cache of size %v", peerLog.Tracef("Serving stale cache of size %v",
len(checkptCache)) len(checkptCache))
defer sp.server.cfCheckptCachesMtx.RUnlock()
} }
// Now that we know the cache is of an appropriate size, we'll iterate // Now that we know the cache is of an appropriate size, we'll iterate