Improve chain notification handling.

Previously a new goroutine was launched for each notification in order to
avoid blocking chain from continuing while the notification is being
processed.  This approach had a couple of issues.

First, since goroutines are not guaranteed to execute in any given order,
the notifications were no longer handled in the same order as they were
sent.  For the current code, this is not a problem, but upcoming code that
handles a transaction memory pool, the order needs to be correct.

Second, goroutines are relatively cheap, but it's still quite a bit of
overhead to launch 3-4 goroutines per block.

This commit modifies the handling code to have a single sink executing in
a separate goroutine.  The main handler then adds the notifications to a
queue which is processed by the sink.  This approach retains the
non-blocking behavior of the previous approach, but also keeps the order
correct and, as an additional benefit, is also more efficient.
This commit is contained in:
Dave Collins 2013-09-28 22:01:39 -05:00
parent f1cd96ceb5
commit 9880cf4646

View file

@ -66,6 +66,7 @@ type blockManager struct {
blockQueue chan *blockMsg
invQueue chan *invMsg
chainNotify chan *btcchain.Notification
chainNotifySink chan *btcchain.Notification
wg sync.WaitGroup
quit chan bool
}
@ -435,14 +436,50 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) {
}
}
// chainNotificationSinkHandler is the sink for the chain notification handler.
// It actually responds to the notifications so the main chain notification
// handler does not block chain while processing notifications. It must be run
// as a goroutine.
func (b *blockManager) chainNotificationSinkHandler() {
out:
for {
select {
case notification := <-b.chainNotifySink:
b.handleNotifyMsg(notification)
case <-b.quit:
break out
}
}
b.wg.Done()
log.Trace("[BMGR] Chain notification sink done")
}
// chainNotificationHandler is the handler for asynchronous notifications from
// btcchain. It must be run as a goroutine.
func (b *blockManager) chainNotificationHandler() {
// pending is a list to queue notifications in order until the they can
// be processed by the sink. This is used to prevent blocking chain
// when it sends notifications while retaining order.
pending := list.New()
out:
for !b.shutdown {
// Sending on a nil channel always blocks and hence is ignored
// by select. Thus enable send only when the list is non-empty.
var firstItem *btcchain.Notification
var chainNotifySink chan *btcchain.Notification
if pending.Len() > 0 {
firstItem = pending.Front().Value.(*btcchain.Notification)
chainNotifySink = b.chainNotifySink
}
select {
case notification := <-b.chainNotify:
go b.handleNotifyMsg(notification)
pending.PushBack(notification)
case chainNotifySink <- firstItem:
pending.Remove(pending.Front())
case <-b.quit:
break out
@ -484,9 +521,10 @@ func (b *blockManager) Start() {
}
log.Trace("[BMGR] Starting block manager")
b.wg.Add(3)
b.wg.Add(4)
go b.syncHandler()
go b.blockHandler()
go b.chainNotificationSinkHandler()
go b.chainNotificationHandler()
b.started = true
}
@ -510,7 +548,7 @@ func (b *blockManager) Stop() error {
// newBlockManager returns a new bitcoin block manager.
// Use Start to begin processing asynchronous block and inv updates.
func newBlockManager(s *server) *blockManager {
chainNotify := make(chan *btcchain.Notification, chanBufferSize)
chainNotify := make(chan *btcchain.Notification)
bm := blockManager{
server: s,
blockChain: btcchain.New(s.db, s.btcnet, chainNotify),
@ -522,6 +560,7 @@ func newBlockManager(s *server) *blockManager {
blockQueue: make(chan *blockMsg, chanBufferSize),
invQueue: make(chan *invMsg, chanBufferSize),
chainNotify: chainNotify,
chainNotifySink: make(chan *btcchain.Notification),
quit: make(chan bool),
}
bm.blockChain.DisableVerify(cfg.VerifyDisabled)