chain: use ConcurrentQueue within BitcoindClient to handle event notifications

This commit is contained in:
Wilmer Paulino 2018-06-25 17:24:23 -07:00
parent fc73cc9678
commit 2091ac0936
No known key found for this signature in database
GPG key ID: 6DF57B9F9514972F

View file

@ -32,9 +32,8 @@ type BitcoindClient struct {
zmqConnect string
zmqPollInterval time.Duration
enqueueNotification chan interface{}
dequeueNotification chan interface{}
currentBlock chan *waddrmgr.BlockStamp
notificationQueue *ConcurrentQueue
currentBlock chan *waddrmgr.BlockStamp
clientMtx sync.RWMutex
rescanUpdate chan interface{}
@ -73,19 +72,17 @@ func NewBitcoindClient(chainParams *chaincfg.Params, connect, user, pass,
DisableTLS: true,
HTTPPostMode: true,
},
chainParams: chainParams,
zmqConnect: zmqConnect,
zmqPollInterval: zmqPollInterval,
enqueueNotification: make(chan interface{}),
dequeueNotification: make(chan interface{}),
currentBlock: make(chan *waddrmgr.BlockStamp),
rescanUpdate: make(chan interface{}),
watchOutPoints: make(map[wire.OutPoint]struct{}),
watchAddrs: make(map[string]struct{}),
watchTxIDs: make(map[chainhash.Hash]struct{}),
quit: make(chan struct{}),
memPool: make(map[chainhash.Hash]struct{}),
memPoolExp: make(map[int32]map[chainhash.Hash]struct{}),
chainParams: chainParams,
zmqConnect: zmqConnect,
zmqPollInterval: zmqPollInterval,
currentBlock: make(chan *waddrmgr.BlockStamp),
rescanUpdate: make(chan interface{}),
watchOutPoints: make(map[wire.OutPoint]struct{}),
watchAddrs: make(map[string]struct{}),
watchTxIDs: make(map[chainhash.Hash]struct{}),
quit: make(chan struct{}),
memPool: make(map[chainhash.Hash]struct{}),
memPoolExp: make(map[int32]map[chainhash.Hash]struct{}),
}
rpcClient, err := rpcclient.New(client.connConfig, nil)
if err != nil {
@ -389,7 +386,6 @@ func (c *BitcoindClient) Start() error {
c.quitMtx.Unlock()
c.wg.Add(2)
go c.handler()
go c.socketHandler(zmqClient)
return nil
}
@ -403,10 +399,7 @@ func (c *BitcoindClient) Stop() {
default:
close(c.quit)
c.client.Shutdown()
if !c.started {
close(c.dequeueNotification)
}
c.notificationQueue.Stop()
}
c.quitMtx.Unlock()
}
@ -423,7 +416,7 @@ func (c *BitcoindClient) WaitForShutdown() {
// may abort for running out memory, as unread notifications are queued for
// later reads.
func (c *BitcoindClient) Notifications() <-chan interface{} {
return c.dequeueNotification
return c.notificationQueue.ChanOut()
}
// SetStartTime is a non-interface method to set the birthday of the wallet
@ -452,7 +445,7 @@ func (c *BitcoindClient) BlockStamp() (*waddrmgr.BlockStamp, error) {
func (c *BitcoindClient) onClientConnect() {
select {
case c.enqueueNotification <- ClientConnected{}:
case c.notificationQueue.ChanIn() <- ClientConnected{}:
case <-c.quit:
}
}
@ -460,7 +453,7 @@ func (c *BitcoindClient) onClientConnect() {
func (c *BitcoindClient) onBlockConnected(hash *chainhash.Hash, height int32, time time.Time) {
if c.notifying() {
select {
case c.enqueueNotification <- BlockConnected{
case c.notificationQueue.ChanIn() <- BlockConnected{
Block: wtxmgr.Block{
Hash: *hash,
Height: height,
@ -476,7 +469,7 @@ func (c *BitcoindClient) onFilteredBlockConnected(height int32,
header *wire.BlockHeader, relevantTxs []*wtxmgr.TxRecord) {
if c.notifying() {
select {
case c.enqueueNotification <- FilteredBlockConnected{
case c.notificationQueue.ChanIn() <- FilteredBlockConnected{
Block: &wtxmgr.BlockMeta{
Block: wtxmgr.Block{
Hash: header.BlockHash(),
@ -494,7 +487,7 @@ func (c *BitcoindClient) onFilteredBlockConnected(height int32,
func (c *BitcoindClient) onBlockDisconnected(hash *chainhash.Hash, height int32, time time.Time) {
if c.notifying() {
select {
case c.enqueueNotification <- BlockDisconnected{
case c.notificationQueue.ChanIn() <- BlockDisconnected{
Block: wtxmgr.Block{
Hash: *hash,
Height: height,
@ -516,14 +509,14 @@ func (c *BitcoindClient) onRelevantTx(rec *wtxmgr.TxRecord,
}
select {
case c.enqueueNotification <- RelevantTx{rec, blk}:
case c.notificationQueue.ChanIn() <- RelevantTx{rec, blk}:
case <-c.quit:
}
}
func (c *BitcoindClient) onRescanProgress(hash *chainhash.Hash, height int32, blkTime time.Time) {
select {
case c.enqueueNotification <- &RescanProgress{hash, height, blkTime}:
case c.notificationQueue.ChanIn() <- &RescanProgress{hash, height, blkTime}:
case <-c.quit:
}
}
@ -531,7 +524,7 @@ func (c *BitcoindClient) onRescanProgress(hash *chainhash.Hash, height int32, bl
func (c *BitcoindClient) onRescanFinished(hash *chainhash.Hash, height int32, blkTime time.Time) {
log.Infof("Rescan finished at %d (%s)", height, hash)
select {
case c.enqueueNotification <- &RescanFinished{hash, height, blkTime}:
case c.notificationQueue.ChanIn() <- &RescanFinished{hash, height, blkTime}:
case <-c.quit:
}
@ -1177,83 +1170,3 @@ func (c *BitcoindClient) filterTx(tx *wire.MsgTx,
return notifyTx, rec, nil
}
// handler maintains a queue of notifications and the current state (best
// block) of the chain.
func (c *BitcoindClient) handler() {
hash, height, err := c.GetBestBlock()
if err != nil {
log.Errorf("Failed to receive best block from chain server: %v", err)
c.Stop()
c.wg.Done()
return
}
bs := &waddrmgr.BlockStamp{Hash: *hash, Height: height}
// TODO: Rather than leaving this as an unbounded queue for all types of
// notifications, try dropping ones where a later enqueued notification
// can fully invalidate one waiting to be processed. For example,
// blockconnected notifications for greater block heights can remove the
// need to process earlier blockconnected notifications still waiting
// here.
// TODO(aakselrod): Factor this logic out so it can be reused for each
// chain back end, rather than copying it.
var notifications []interface{}
enqueue := c.enqueueNotification
var dequeue chan interface{}
var next interface{}
out:
for {
select {
case n, ok := <-enqueue:
if !ok {
// If no notifications are queued for handling,
// the queue is finished.
if len(notifications) == 0 {
break out
}
// nil channel so no more reads can occur.
enqueue = nil
continue
}
if len(notifications) == 0 {
next = n
dequeue = c.dequeueNotification
}
notifications = append(notifications, n)
case dequeue <- next:
if n, ok := next.(BlockConnected); ok {
bs = &waddrmgr.BlockStamp{
Height: n.Height,
Hash: n.Hash,
}
}
notifications[0] = nil
notifications = notifications[1:]
if len(notifications) != 0 {
next = notifications[0]
} else {
// If no more notifications can be enqueued, the
// queue is finished.
if enqueue == nil {
break out
}
dequeue = nil
}
case c.currentBlock <- bs:
case <-c.quit:
break out
}
}
c.Stop()
close(c.dequeueNotification)
c.wg.Done()
}