diff --git a/blockmanager.go b/blockmanager.go index 0d51dde2..7b853bf8 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -66,6 +66,7 @@ type blockManager struct { blockQueue chan *blockMsg invQueue chan *invMsg chainNotify chan *btcchain.Notification + chainNotifySink chan *btcchain.Notification wg sync.WaitGroup quit chan bool } @@ -435,14 +436,50 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) { } } +// chainNotificationSinkHandler is the sink for the chain notification handler. +// It actually responds to the notifications so the main chain notification +// handler does not block chain while processing notifications. It must be run +// as a goroutine. +func (b *blockManager) chainNotificationSinkHandler() { +out: + for { + select { + case notification := <-b.chainNotifySink: + b.handleNotifyMsg(notification) + + case <-b.quit: + break out + } + } + b.wg.Done() + log.Trace("[BMGR] Chain notification sink done") +} + // chainNotificationHandler is the handler for asynchronous notifications from // btcchain. It must be run as a goroutine. func (b *blockManager) chainNotificationHandler() { + + // pending is a list to queue notifications in order until the they can + // be processed by the sink. This is used to prevent blocking chain + // when it sends notifications while retaining order. + pending := list.New() out: for !b.shutdown { + // Sending on a nil channel always blocks and hence is ignored + // by select. Thus enable send only when the list is non-empty. + var firstItem *btcchain.Notification + var chainNotifySink chan *btcchain.Notification + if pending.Len() > 0 { + firstItem = pending.Front().Value.(*btcchain.Notification) + chainNotifySink = b.chainNotifySink + } + select { case notification := <-b.chainNotify: - go b.handleNotifyMsg(notification) + pending.PushBack(notification) + + case chainNotifySink <- firstItem: + pending.Remove(pending.Front()) case <-b.quit: break out @@ -484,9 +521,10 @@ func (b *blockManager) Start() { } log.Trace("[BMGR] Starting block manager") - b.wg.Add(3) + b.wg.Add(4) go b.syncHandler() go b.blockHandler() + go b.chainNotificationSinkHandler() go b.chainNotificationHandler() b.started = true } @@ -510,7 +548,7 @@ func (b *blockManager) Stop() error { // newBlockManager returns a new bitcoin block manager. // Use Start to begin processing asynchronous block and inv updates. func newBlockManager(s *server) *blockManager { - chainNotify := make(chan *btcchain.Notification, chanBufferSize) + chainNotify := make(chan *btcchain.Notification) bm := blockManager{ server: s, blockChain: btcchain.New(s.db, s.btcnet, chainNotify), @@ -522,6 +560,7 @@ func newBlockManager(s *server) *blockManager { blockQueue: make(chan *blockMsg, chanBufferSize), invQueue: make(chan *invMsg, chanBufferSize), chainNotify: chainNotify, + chainNotifySink: make(chan *btcchain.Notification), quit: make(chan bool), } bm.blockChain.DisableVerify(cfg.VerifyDisabled)