Merge pull request #1218 from wpaulino/filter-type-verification
server: ensure we only fetch filters we know of
This commit is contained in:
commit
bc3861a5a2
1 changed files with 119 additions and 41 deletions
160
server.go
160
server.go
|
@ -762,8 +762,21 @@ func (sp *serverPeer) OnGetCFilters(_ *peer.Peer, msg *wire.MsgGetCFilters) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
hashes, err := sp.server.chain.HeightToHashRange(int32(msg.StartHeight),
|
// We'll also ensure that the remote party is requesting a set of
|
||||||
&msg.StopHash, wire.MaxGetCFiltersReqRange)
|
// filters that we actually currently maintain.
|
||||||
|
switch msg.FilterType {
|
||||||
|
case wire.GCSFilterRegular:
|
||||||
|
break
|
||||||
|
|
||||||
|
default:
|
||||||
|
peerLog.Debug("Filter request for unknown filter: %v",
|
||||||
|
msg.FilterType)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
hashes, err := sp.server.chain.HeightToHashRange(
|
||||||
|
int32(msg.StartHeight), &msg.StopHash, wire.MaxGetCFiltersReqRange,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
peerLog.Debugf("Invalid getcfilters request: %v", err)
|
peerLog.Debugf("Invalid getcfilters request: %v", err)
|
||||||
return
|
return
|
||||||
|
@ -776,8 +789,9 @@ func (sp *serverPeer) OnGetCFilters(_ *peer.Peer, msg *wire.MsgGetCFilters) {
|
||||||
hashPtrs[i] = &hashes[i]
|
hashPtrs[i] = &hashes[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
filters, err := sp.server.cfIndex.FiltersByBlockHashes(hashPtrs,
|
filters, err := sp.server.cfIndex.FiltersByBlockHashes(
|
||||||
msg.FilterType)
|
hashPtrs, msg.FilterType,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
peerLog.Errorf("Error retrieving cfilters: %v", err)
|
peerLog.Errorf("Error retrieving cfilters: %v", err)
|
||||||
return
|
return
|
||||||
|
@ -785,10 +799,14 @@ func (sp *serverPeer) OnGetCFilters(_ *peer.Peer, msg *wire.MsgGetCFilters) {
|
||||||
|
|
||||||
for i, filterBytes := range filters {
|
for i, filterBytes := range filters {
|
||||||
if len(filterBytes) == 0 {
|
if len(filterBytes) == 0 {
|
||||||
peerLog.Warnf("Could not obtain cfilter for %v", hashes[i])
|
peerLog.Warnf("Could not obtain cfilter for %v",
|
||||||
|
hashes[i])
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
filterMsg := wire.NewMsgCFilter(msg.FilterType, &hashes[i], filterBytes)
|
|
||||||
|
filterMsg := wire.NewMsgCFilter(
|
||||||
|
msg.FilterType, &hashes[i], filterBytes,
|
||||||
|
)
|
||||||
sp.QueueMessage(filterMsg, nil)
|
sp.QueueMessage(filterMsg, nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -800,19 +818,32 @@ func (sp *serverPeer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We'll also ensure that the remote party is requesting a set of
|
||||||
|
// headers for filters that we actually currently maintain.
|
||||||
|
switch msg.FilterType {
|
||||||
|
case wire.GCSFilterRegular:
|
||||||
|
break
|
||||||
|
|
||||||
|
default:
|
||||||
|
peerLog.Debug("Filter request for unknown headers for "+
|
||||||
|
"filter: %v", msg.FilterType)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
startHeight := int32(msg.StartHeight)
|
startHeight := int32(msg.StartHeight)
|
||||||
maxResults := wire.MaxCFHeadersPerMsg
|
maxResults := wire.MaxCFHeadersPerMsg
|
||||||
|
|
||||||
// If StartHeight is positive, fetch the predecessor block hash so we can
|
// If StartHeight is positive, fetch the predecessor block hash so we
|
||||||
// populate the PrevFilterHeader field.
|
// can populate the PrevFilterHeader field.
|
||||||
if msg.StartHeight > 0 {
|
if msg.StartHeight > 0 {
|
||||||
startHeight--
|
startHeight--
|
||||||
maxResults++
|
maxResults++
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch the hashes from the block index.
|
// Fetch the hashes from the block index.
|
||||||
hashList, err := sp.server.chain.HeightToHashRange(startHeight,
|
hashList, err := sp.server.chain.HeightToHashRange(
|
||||||
&msg.StopHash, maxResults)
|
startHeight, &msg.StopHash, maxResults,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
peerLog.Debugf("Invalid getcfheaders request: %v", err)
|
peerLog.Debugf("Invalid getcfheaders request: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -833,8 +864,9 @@ func (sp *serverPeer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch the raw filter hash bytes from the database for all blocks.
|
// Fetch the raw filter hash bytes from the database for all blocks.
|
||||||
filterHashes, err := sp.server.cfIndex.FilterHashesByBlockHashes(hashPtrs,
|
filterHashes, err := sp.server.cfIndex.FilterHashesByBlockHashes(
|
||||||
msg.FilterType)
|
hashPtrs, msg.FilterType,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
peerLog.Errorf("Error retrieving cfilter hashes: %v", err)
|
peerLog.Errorf("Error retrieving cfilter hashes: %v", err)
|
||||||
return
|
return
|
||||||
|
@ -892,6 +924,7 @@ func (sp *serverPeer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) {
|
||||||
|
|
||||||
headersMsg.FilterType = msg.FilterType
|
headersMsg.FilterType = msg.FilterType
|
||||||
headersMsg.StopHash = msg.StopHash
|
headersMsg.StopHash = msg.StopHash
|
||||||
|
|
||||||
sp.QueueMessage(headersMsg, nil)
|
sp.QueueMessage(headersMsg, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -902,42 +935,78 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
blockHashes, err := sp.server.chain.IntervalBlockHashes(&msg.StopHash,
|
// We'll also ensure that the remote party is requesting a set of
|
||||||
wire.CFCheckptInterval)
|
// checkpoints for filters that we actually currently maintain.
|
||||||
|
switch msg.FilterType {
|
||||||
|
case wire.GCSFilterRegular:
|
||||||
|
break
|
||||||
|
|
||||||
|
default:
|
||||||
|
peerLog.Debug("Filter request for unknown checkpoints for "+
|
||||||
|
"filter: %v", msg.FilterType)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now that we know the client is fetching a filter that we know of,
|
||||||
|
// we'll fetch the block hashes et each check point interval so we can
|
||||||
|
// compare against our cache, and create new check points if necessary.
|
||||||
|
blockHashes, err := sp.server.chain.IntervalBlockHashes(
|
||||||
|
&msg.StopHash, wire.CFCheckptInterval,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
peerLog.Debugf("Invalid getcfilters request: %v", err)
|
peerLog.Debugf("Invalid getcfilters request: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var updateCache bool
|
checkptMsg := wire.NewMsgCFCheckpt(
|
||||||
var checkptCache []cfHeaderKV
|
msg.FilterType, &msg.StopHash, len(blockHashes),
|
||||||
|
)
|
||||||
|
|
||||||
if len(blockHashes) > len(checkptCache) {
|
// Fetch the current existing cache so we can decide if we need to
|
||||||
// Update the cache if the checkpoint chain is longer than the cached
|
// extend it or if its adequate as is.
|
||||||
// one. This ensures that the cache is relatively stable and mostly
|
|
||||||
// overlaps with the best chain, since it follows the longest chain
|
|
||||||
// heuristic.
|
|
||||||
updateCache = true
|
|
||||||
|
|
||||||
// Take write lock because we are going to update cache.
|
|
||||||
sp.server.cfCheckptCachesMtx.Lock()
|
sp.server.cfCheckptCachesMtx.Lock()
|
||||||
|
checkptCache := sp.server.cfCheckptCaches[msg.FilterType]
|
||||||
|
|
||||||
|
// If the set of block hashes is beyond the current size of the cache,
|
||||||
|
// then we'll expand the size of the cache and also retain the write
|
||||||
|
// lock.
|
||||||
|
var updateCache bool
|
||||||
|
if len(blockHashes) > len(checkptCache) {
|
||||||
|
// Now that we know we'll need to modify the size of the cache,
|
||||||
|
// we'll defer the release of the write lock so we don't
|
||||||
|
// forget.
|
||||||
defer sp.server.cfCheckptCachesMtx.Unlock()
|
defer sp.server.cfCheckptCachesMtx.Unlock()
|
||||||
|
|
||||||
// Grow the checkptCache to be the length of blockHashes.
|
// We'll mark that we need to update the cache for below and
|
||||||
|
// also expand the size of the cache in place.
|
||||||
|
updateCache = true
|
||||||
|
|
||||||
additionalLength := len(blockHashes) - len(checkptCache)
|
additionalLength := len(blockHashes) - len(checkptCache)
|
||||||
checkptCache = append(sp.server.cfCheckptCaches[msg.FilterType],
|
newEntries := make([]cfHeaderKV, additionalLength)
|
||||||
make([]cfHeaderKV, additionalLength)...)
|
|
||||||
|
peerLog.Infof("Growing size of checkpoint cache from %v to %v "+
|
||||||
|
"block hashes", len(checkptCache), len(blockHashes))
|
||||||
|
|
||||||
|
checkptCache = append(
|
||||||
|
sp.server.cfCheckptCaches[msg.FilterType],
|
||||||
|
newEntries...,
|
||||||
|
)
|
||||||
} else {
|
} else {
|
||||||
updateCache = false
|
// Otherwise, we'll release the write lock, then grab the read
|
||||||
|
// lock, as the cache is already properly sized.
|
||||||
// Take reader lock because we are not going to update cache.
|
sp.server.cfCheckptCachesMtx.Unlock()
|
||||||
sp.server.cfCheckptCachesMtx.RLock()
|
sp.server.cfCheckptCachesMtx.RLock()
|
||||||
defer sp.server.cfCheckptCachesMtx.RUnlock()
|
|
||||||
|
|
||||||
checkptCache = sp.server.cfCheckptCaches[msg.FilterType]
|
peerLog.Tracef("Serving stale cache of size %v",
|
||||||
|
len(checkptCache))
|
||||||
|
|
||||||
|
defer sp.server.cfCheckptCachesMtx.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterate backwards until the block hash is found in the cache.
|
// Now that we know the cache is of an appropriate size, we'll iterate
|
||||||
|
// backwards until the find the block hash. We do this as it's possible
|
||||||
|
// a re-org has occurred so items in the db are now in the main china
|
||||||
|
// while the cache has been partially invalidated.
|
||||||
var forkIdx int
|
var forkIdx int
|
||||||
for forkIdx = len(checkptCache); forkIdx > 0; forkIdx-- {
|
for forkIdx = len(checkptCache); forkIdx > 0; forkIdx-- {
|
||||||
if checkptCache[forkIdx-1].blockHash == blockHashes[forkIdx-1] {
|
if checkptCache[forkIdx-1].blockHash == blockHashes[forkIdx-1] {
|
||||||
|
@ -945,29 +1014,33 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Populate results with cached checkpoints.
|
// Now that we know the how much of the cache is relevant for this
|
||||||
checkptMsg := wire.NewMsgCFCheckpt(msg.FilterType, &msg.StopHash,
|
// query, we'll populate our check point message with the cache as is.
|
||||||
len(blockHashes))
|
// Shortly below, we'll populate the new elements of the cache.
|
||||||
for i := 0; i < forkIdx; i++ {
|
for i := 0; i < forkIdx; i++ {
|
||||||
checkptMsg.AddCFHeader(&checkptCache[i].filterHeader)
|
checkptMsg.AddCFHeader(&checkptCache[i].filterHeader)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Look up any filter headers that aren't cached.
|
// We'll now collect the set of hashes that are beyond our cache so we
|
||||||
|
// can look up the filter headers to populate the final cache.
|
||||||
blockHashPtrs := make([]*chainhash.Hash, 0, len(blockHashes)-forkIdx)
|
blockHashPtrs := make([]*chainhash.Hash, 0, len(blockHashes)-forkIdx)
|
||||||
for i := forkIdx; i < len(blockHashes); i++ {
|
for i := forkIdx; i < len(blockHashes); i++ {
|
||||||
blockHashPtrs = append(blockHashPtrs, &blockHashes[i])
|
blockHashPtrs = append(blockHashPtrs, &blockHashes[i])
|
||||||
}
|
}
|
||||||
|
filterHeaders, err := sp.server.cfIndex.FilterHeadersByBlockHashes(
|
||||||
filterHeaders, err := sp.server.cfIndex.FilterHeadersByBlockHashes(blockHashPtrs,
|
blockHashPtrs, msg.FilterType,
|
||||||
msg.FilterType)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
peerLog.Errorf("Error retrieving cfilter headers: %v", err)
|
peerLog.Errorf("Error retrieving cfilter headers: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Now that we have the full set of filter headers, we'll add them to
|
||||||
|
// the checkpoint message, and also update our cache in line.
|
||||||
for i, filterHeaderBytes := range filterHeaders {
|
for i, filterHeaderBytes := range filterHeaders {
|
||||||
if len(filterHeaderBytes) == 0 {
|
if len(filterHeaderBytes) == 0 {
|
||||||
peerLog.Warnf("Could not obtain CF header for %v", blockHashPtrs[i])
|
peerLog.Warnf("Could not obtain CF header for %v",
|
||||||
|
blockHashPtrs[i])
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -979,6 +1052,9 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
|
||||||
}
|
}
|
||||||
|
|
||||||
checkptMsg.AddCFHeader(filterHeader)
|
checkptMsg.AddCFHeader(filterHeader)
|
||||||
|
|
||||||
|
// If the new main chain is longer than what's in the cache,
|
||||||
|
// then we'll override it beyond the fork point.
|
||||||
if updateCache {
|
if updateCache {
|
||||||
checkptCache[forkIdx+i] = cfHeaderKV{
|
checkptCache[forkIdx+i] = cfHeaderKV{
|
||||||
blockHash: blockHashes[forkIdx+i],
|
blockHash: blockHashes[forkIdx+i],
|
||||||
|
@ -987,6 +1063,8 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Finally, we'll update the cache if we need to, and send the final
|
||||||
|
// message back to the requesting peer.
|
||||||
if updateCache {
|
if updateCache {
|
||||||
sp.server.cfCheckptCaches[msg.FilterType] = checkptCache
|
sp.server.cfCheckptCaches[msg.FilterType] = checkptCache
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue