Change new get sync peer bits to a query channel.
Rather than using a dedicated channel for the sync peer request and reply, use a single query channel that accepts a query type as well as a reply channel. This will allow other queries to be added in the future without the various queries being racy.
This commit is contained in:
parent
aab3a6643c
commit
d949072d6d
1 changed files with 21 additions and 9 deletions
|
@ -70,6 +70,12 @@ type txMsg struct {
|
||||||
peer *peer
|
peer *peer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getSyncPeerMsg is a message type to be sent across the query channel for
|
||||||
|
// retrieving the current sync peer.
|
||||||
|
type getSyncPeerMsg struct {
|
||||||
|
reply chan *peer
|
||||||
|
}
|
||||||
|
|
||||||
// headerNode is used as a node in a list of headers that are linked together
|
// headerNode is used as a node in a list of headers that are linked together
|
||||||
// between checkpoints.
|
// between checkpoints.
|
||||||
type headerNode struct {
|
type headerNode struct {
|
||||||
|
@ -93,8 +99,7 @@ type blockManager struct {
|
||||||
processingReqs bool
|
processingReqs bool
|
||||||
syncPeer *peer
|
syncPeer *peer
|
||||||
msgChan chan interface{}
|
msgChan chan interface{}
|
||||||
syncPeerRequest chan bool
|
query chan interface{}
|
||||||
syncPeerResult chan *peer
|
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
quit chan bool
|
quit chan bool
|
||||||
|
|
||||||
|
@ -904,9 +909,16 @@ out:
|
||||||
// bitch and whine.
|
// bitch and whine.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return the current sync peer.
|
// Queries used for atomically retrieving internal state.
|
||||||
case <-b.syncPeerRequest:
|
case qmsg := <-b.query:
|
||||||
b.syncPeerResult <- b.syncPeer
|
switch msg := qmsg.(type) {
|
||||||
|
case getSyncPeerMsg:
|
||||||
|
msg.reply <- b.syncPeer
|
||||||
|
|
||||||
|
default:
|
||||||
|
bmgrLog.Warnf("Invalid query type in block "+
|
||||||
|
"handler query: %T", msg)
|
||||||
|
}
|
||||||
|
|
||||||
case <-b.quit:
|
case <-b.quit:
|
||||||
break out
|
break out
|
||||||
|
@ -1112,8 +1124,9 @@ func (b *blockManager) Stop() error {
|
||||||
|
|
||||||
// SyncPeer returns the current sync peer.
|
// SyncPeer returns the current sync peer.
|
||||||
func (b *blockManager) SyncPeer() *peer {
|
func (b *blockManager) SyncPeer() *peer {
|
||||||
b.syncPeerRequest <- true
|
reply := make(chan *peer)
|
||||||
return <-b.syncPeerResult
|
b.query <- getSyncPeerMsg{reply: reply}
|
||||||
|
return <-reply
|
||||||
}
|
}
|
||||||
|
|
||||||
// newBlockManager returns a new bitcoin block manager.
|
// newBlockManager returns a new bitcoin block manager.
|
||||||
|
@ -1131,8 +1144,7 @@ func newBlockManager(s *server) (*blockManager, error) {
|
||||||
requestedBlocks: make(map[btcwire.ShaHash]bool),
|
requestedBlocks: make(map[btcwire.ShaHash]bool),
|
||||||
lastBlockLogTime: time.Now(),
|
lastBlockLogTime: time.Now(),
|
||||||
msgChan: make(chan interface{}, cfg.MaxPeers*3),
|
msgChan: make(chan interface{}, cfg.MaxPeers*3),
|
||||||
syncPeerRequest: make(chan bool, 1),
|
query: make(chan interface{}),
|
||||||
syncPeerResult: make(chan *peer, 1),
|
|
||||||
headerList: list.New(),
|
headerList: list.New(),
|
||||||
quit: make(chan bool),
|
quit: make(chan bool),
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue