Merge pull request #511 from wpaulino/bitcoind-rescan-client

chain: share the same bitcoind connection between multiple rescan clients
This commit is contained in:
Olaoluwa Osuntokun 2018-07-31 19:53:37 -07:00 committed by GitHub
commit 1ede0a1a66
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 1782 additions and 1330 deletions

File diff suppressed because it is too large Load diff

1254
chain/bitcoind_client.go Normal file

File diff suppressed because it is too large Load diff

362
chain/bitcoind_conn.go Normal file
View file

@ -0,0 +1,362 @@
package chain
import (
"bytes"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/gozmq"
)
// BitcoindConn represents a persistent client connection to a bitcoind node
// that listens for events read from a ZMQ connection.
type BitcoindConn struct {
started int32 // To be used atomically.
stopped int32 // To be used atomically.
// rescanClientCounter is an atomic counter that assigns a unique ID to
// each new bitcoind rescan client using the current bitcoind
// connection.
rescanClientCounter uint64
// chainParams identifies the current network the bitcoind node is
// running on.
chainParams *chaincfg.Params
// client is the RPC client to the bitcoind node.
client *rpcclient.Client
// zmqBlockHost is the host listening for ZMQ connections that will be
// responsible for delivering raw transaction events.
zmqBlockHost string
// zmqTxHost is the host listening for ZMQ connections that will be
// responsible for delivering raw transaction events.
zmqTxHost string
// zmqPollInterval is the interval at which we'll attempt to retrieve an
// event from the ZMQ connection.
zmqPollInterval time.Duration
// rescanClients is the set of active bitcoind rescan clients to which
// ZMQ event notfications will be sent to.
rescanClientsMtx sync.Mutex
rescanClients map[uint64]*BitcoindClient
quit chan struct{}
wg sync.WaitGroup
}
// NewBitcoindConn creates a client connection to the node described by the host
// string. The connection is not established immediately, but must be done using
// the Start method. If the remote node does not operate on the same bitcoin
// network as described by the passed chain parameters, the connection will be
// disconnected.
func NewBitcoindConn(chainParams *chaincfg.Params,
host, user, pass, zmqBlockHost, zmqTxHost string,
zmqPollInterval time.Duration) (*BitcoindConn, error) {
clientCfg := &rpcclient.ConnConfig{
Host: host,
User: user,
Pass: pass,
DisableAutoReconnect: false,
DisableConnectOnNew: true,
DisableTLS: true,
HTTPPostMode: true,
}
client, err := rpcclient.New(clientCfg, nil)
if err != nil {
return nil, err
}
conn := &BitcoindConn{
chainParams: chainParams,
client: client,
zmqBlockHost: zmqBlockHost,
zmqTxHost: zmqTxHost,
zmqPollInterval: zmqPollInterval,
rescanClients: make(map[uint64]*BitcoindClient),
quit: make(chan struct{}),
}
return conn, nil
}
// Start attempts to establish a RPC and ZMQ connection to a bitcoind node. If
// successful, a goroutine is spawned to read events from the ZMQ connection.
// It's possible for this function to fail due to a limited number of connection
// attempts. This is done to prevent waiting forever on the connection to be
// established in the case that the node is down.
func (c *BitcoindConn) Start() error {
if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
return nil
}
// Verify that the node is running on the expected network.
net, err := c.getCurrentNet()
if err != nil {
c.client.Disconnect()
return err
}
if net != c.chainParams.Net {
c.client.Disconnect()
return fmt.Errorf("expected network %v, got %v",
c.chainParams.Net, net)
}
// Establish two different ZMQ connections to bitcoind to retrieve block
// and transaction event notifications. We'll use two as a separation of
// concern to ensure one type of event isn't dropped from the connection
// queue due to another type of event filling it up.
zmqBlockConn, err := gozmq.Subscribe(
c.zmqBlockHost, []string{"rawblock"}, c.zmqPollInterval,
)
if err != nil {
c.client.Disconnect()
return fmt.Errorf("unable to subscribe for zmq block events: "+
"%v", err)
}
zmqTxConn, err := gozmq.Subscribe(
c.zmqTxHost, []string{"rawtx"}, c.zmqPollInterval,
)
if err != nil {
c.client.Disconnect()
return fmt.Errorf("unable to subscribe for zmq tx events: %v",
err)
}
c.wg.Add(2)
go c.blockEventHandler(zmqBlockConn)
go c.txEventHandler(zmqTxConn)
return nil
}
// Stop terminates the RPC and ZMQ connection to a bitcoind node and removes any
// active rescan clients.
func (c *BitcoindConn) Stop() {
if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
return
}
for _, client := range c.rescanClients {
client.Stop()
}
close(c.quit)
c.client.Shutdown()
c.client.WaitForShutdown()
c.wg.Wait()
}
// blockEventHandler reads raw blocks events from the ZMQ block socket and
// forwards them along to the current rescan clients.
//
// NOTE: This must be run as a goroutine.
func (c *BitcoindConn) blockEventHandler(conn *gozmq.Conn) {
defer c.wg.Done()
defer conn.Close()
log.Info("Started listening for bitcoind block notifications via ZMQ ",
"on", c.zmqBlockHost)
for {
// Before attempting to read from the ZMQ socket, we'll make
// sure to check if we've been requested to shut down.
select {
case <-c.quit:
return
default:
}
// Poll an event from the ZMQ socket. It's possible that the
// connection to the socket continuously times out, so we'll
// prevent logging this error to prevent spamming the logs.
msgBytes, err := conn.Receive()
if err != nil {
err, ok := err.(net.Error)
if !ok || !err.Timeout() {
log.Error(err)
}
continue
}
// We have an event! We'll now ensure it is a block event,
// deserialize it, and report it to the different rescan
// clients.
eventType := string(msgBytes[0])
switch eventType {
case "rawblock":
block := &wire.MsgBlock{}
r := bytes.NewReader(msgBytes[1])
if err := block.Deserialize(r); err != nil {
log.Errorf("Unable to deserialize block: %v",
err)
continue
}
c.rescanClientsMtx.Lock()
for _, client := range c.rescanClients {
select {
case client.zmqBlockNtfns <- block:
case <-client.quit:
case <-c.quit:
c.rescanClientsMtx.Unlock()
return
}
}
c.rescanClientsMtx.Unlock()
default:
log.Warnf("Received unexpected event type from "+
"rawblock subscription: %v", eventType)
}
}
}
// txEventHandler reads raw blocks events from the ZMQ block socket and forwards
// them along to the current rescan clients.
//
// NOTE: This must be run as a goroutine.
func (c *BitcoindConn) txEventHandler(conn *gozmq.Conn) {
defer c.wg.Done()
defer conn.Close()
log.Info("Started listening for bitcoind transaction notifications "+
"via ZMQ on ", c.zmqTxHost)
for {
// Before attempting to read from the ZMQ socket, we'll make
// sure to check if we've been requested to shut down.
select {
case <-c.quit:
return
default:
}
// Poll an event from the ZMQ socket. It's possible that the
// connection to the socket continuously times out, so we'll
// prevent logging this error to prevent spamming the logs.
msgBytes, err := conn.Receive()
if err != nil {
err, ok := err.(net.Error)
if !ok || !err.Timeout() {
log.Error(err)
}
continue
}
// We have an event! We'll now ensure it is a transaction event,
// deserialize it, and report it to the different rescan
// clients.
eventType := string(msgBytes[0])
switch eventType {
case "rawtx":
tx := &wire.MsgTx{}
r := bytes.NewReader(msgBytes[1])
if err := tx.Deserialize(r); err != nil {
log.Errorf("Unable to deserialize "+
"transaction: %v", err)
continue
}
c.rescanClientsMtx.Lock()
for _, client := range c.rescanClients {
select {
case client.zmqTxNtfns <- tx:
case <-client.quit:
case <-c.quit:
c.rescanClientsMtx.Unlock()
return
}
}
c.rescanClientsMtx.Unlock()
default:
log.Warnf("Received unexpected event type from rawtx "+
"subscription: %v", eventType)
}
}
}
// getCurrentNet returns the network on which the bitcoind node is running.
func (c *BitcoindConn) getCurrentNet() (wire.BitcoinNet, error) {
hash, err := c.client.GetBlockHash(0)
if err != nil {
return 0, err
}
switch *hash {
case *chaincfg.TestNet3Params.GenesisHash:
return chaincfg.TestNet3Params.Net, nil
case *chaincfg.RegressionNetParams.GenesisHash:
return chaincfg.RegressionNetParams.Net, nil
case *chaincfg.MainNetParams.GenesisHash:
return chaincfg.MainNetParams.Net, nil
default:
return 0, fmt.Errorf("unknown network with genesis hash %v", hash)
}
}
// NewBitcoindClient returns a bitcoind client using the current bitcoind
// connection. This allows us to share the same connection using multiple
// clients. The birthday signifies the earliest time for which we should begin
// scanning the chain.
func (c *BitcoindConn) NewBitcoindClient(birthday time.Time) *BitcoindClient {
return &BitcoindClient{
quit: make(chan struct{}),
id: atomic.AddUint64(&c.rescanClientCounter, 1),
birthday: birthday,
chainParams: c.chainParams,
chainConn: c,
rescanUpdate: make(chan interface{}),
watchedAddresses: make(map[string]struct{}),
watchedOutPoints: make(map[wire.OutPoint]struct{}),
watchedTxs: make(map[chainhash.Hash]struct{}),
notificationQueue: NewConcurrentQueue(20),
zmqTxNtfns: make(chan *wire.MsgTx),
zmqBlockNtfns: make(chan *wire.MsgBlock),
mempool: make(map[chainhash.Hash]struct{}),
expiredMempool: make(map[int32]map[chainhash.Hash]struct{}),
}
}
// AddClient adds a client to the set of active rescan clients of the current
// chain connection. This allows the connection to include the specified client
// in its notification delivery.
//
// NOTE: This function is safe for concurrent access.
func (c *BitcoindConn) AddClient(client *BitcoindClient) {
c.rescanClientsMtx.Lock()
defer c.rescanClientsMtx.Unlock()
c.rescanClients[client.id] = client
}
// RemoveClient removes the client with the given ID from the set of active
// rescan clients. Once removed, the client will no longer receive block and
// transaction notifications from the chain connection.
//
// NOTE: This function is safe for concurrent access.
func (c *BitcoindConn) RemoveClient(id uint64) {
c.rescanClientsMtx.Lock()
defer c.rescanClientsMtx.Unlock()
delete(c.rescanClients, id)
}

88
chain/queue.go Normal file
View file

@ -0,0 +1,88 @@
package chain
import (
"container/list"
)
// ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity.
// Clients interact with the queue by pushing items into the in channel and
// popping items from the out channel. There is a goroutine that manages moving
// items from the in channel to the out channel in the correct order that must
// be started by calling Start().
type ConcurrentQueue struct {
chanIn chan interface{}
chanOut chan interface{}
quit chan struct{}
overflow *list.List
}
// NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is
// the capacity of the output channel. When the size of the queue is below this
// threshold, pushes do not incur the overhead of the less efficient overflow
// structure.
func NewConcurrentQueue(bufferSize int) *ConcurrentQueue {
return &ConcurrentQueue{
chanIn: make(chan interface{}),
chanOut: make(chan interface{}, bufferSize),
quit: make(chan struct{}),
overflow: list.New(),
}
}
// ChanIn returns a channel that can be used to push new items into the queue.
func (cq *ConcurrentQueue) ChanIn() chan<- interface{} {
return cq.chanIn
}
// ChanOut returns a channel that can be used to pop items from the queue.
func (cq *ConcurrentQueue) ChanOut() <-chan interface{} {
return cq.chanOut
}
// Start begins a goroutine that manages moving items from the in channel to
// the out channel. The queue tries to move items directly to the out channel
// minimize overhead, but if the out channel is full it pushes items to an
// overflow queue. This must be called before using the queue.
func (cq *ConcurrentQueue) Start() {
go func() {
for {
nextElement := cq.overflow.Front()
if nextElement == nil {
// The overflow queue is empty, so incoming
// items can be pushed directly to the output
// channel. However, if output channel is full,
// we'll push to the overflow list instead.
select {
case item := <-cq.chanIn:
select {
case cq.chanOut <- item:
case <-cq.quit:
return
default:
cq.overflow.PushBack(item)
}
case <-cq.quit:
return
}
} else {
// The overflow queue is not empty, so any new
// items get pushed to the back to preserve
// order.
select {
case item := <-cq.chanIn:
cq.overflow.PushBack(item)
case cq.chanOut <- nextElement.Value:
cq.overflow.Remove(nextElement)
case <-cq.quit:
return
}
}
}
}()
}
// Stop ends the goroutine that moves items from the in channel to the out
// channel.
func (cq *ConcurrentQueue) Stop() {
close(cq.quit)
}

View file

@ -16,10 +16,11 @@ import (
) )
func (w *Wallet) handleChainNotifications() { func (w *Wallet) handleChainNotifications() {
defer w.wg.Done()
chainClient, err := w.requireChainClient() chainClient, err := w.requireChainClient()
if err != nil { if err != nil {
log.Errorf("handleChainNotifications called without RPC client") log.Errorf("handleChainNotifications called without RPC client")
w.wg.Done()
return return
} }
@ -84,80 +85,88 @@ func (w *Wallet) handleChainNotifications() {
return err return err
} }
for n := range chainClient.Notifications() { for {
var notificationName string select {
var err error case n, ok := <-chainClient.Notifications():
switch n := n.(type) { if !ok {
case chain.ClientConnected: return
go sync(w)
case chain.BlockConnected:
err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
return w.connectBlock(tx, wtxmgr.BlockMeta(n))
})
notificationName = "blockconnected"
case chain.BlockDisconnected:
err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
return w.disconnectBlock(tx, wtxmgr.BlockMeta(n))
})
notificationName = "blockdisconnected"
case chain.RelevantTx:
err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
return w.addRelevantTx(tx, n.TxRecord, n.Block)
})
notificationName = "recvtx/redeemingtx"
case chain.FilteredBlockConnected:
// Atomically update for the whole block.
if len(n.RelevantTxs) > 0 {
err = walletdb.Update(w.db, func(
tx walletdb.ReadWriteTx) error {
var err error
for _, rec := range n.RelevantTxs {
err = w.addRelevantTx(tx, rec,
n.Block)
if err != nil {
return err
}
}
return nil
})
} }
notificationName = "filteredblockconnected"
// The following require some database maintenance, but also var notificationName string
// need to be reported to the wallet's rescan goroutine. var err error
case *chain.RescanProgress: switch n := n.(type) {
err = catchUpHashes(w, chainClient, n.Height) case chain.ClientConnected:
notificationName = "rescanprogress" go sync(w)
select { case chain.BlockConnected:
case w.rescanNotifications <- n: err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
case <-w.quitChan(): return w.connectBlock(tx, wtxmgr.BlockMeta(n))
return })
notificationName = "blockconnected"
case chain.BlockDisconnected:
err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
return w.disconnectBlock(tx, wtxmgr.BlockMeta(n))
})
notificationName = "blockdisconnected"
case chain.RelevantTx:
err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
return w.addRelevantTx(tx, n.TxRecord, n.Block)
})
notificationName = "recvtx/redeemingtx"
case chain.FilteredBlockConnected:
// Atomically update for the whole block.
if len(n.RelevantTxs) > 0 {
err = walletdb.Update(w.db, func(
tx walletdb.ReadWriteTx) error {
var err error
for _, rec := range n.RelevantTxs {
err = w.addRelevantTx(tx, rec,
n.Block)
if err != nil {
return err
}
}
return nil
})
}
notificationName = "filteredblockconnected"
// The following require some database maintenance, but also
// need to be reported to the wallet's rescan goroutine.
case *chain.RescanProgress:
err = catchUpHashes(w, chainClient, n.Height)
notificationName = "rescanprogress"
select {
case w.rescanNotifications <- n:
case <-w.quitChan():
return
}
case *chain.RescanFinished:
err = catchUpHashes(w, chainClient, n.Height)
notificationName = "rescanprogress"
w.SetChainSynced(true)
select {
case w.rescanNotifications <- n:
case <-w.quitChan():
return
}
} }
case *chain.RescanFinished: if err != nil {
err = catchUpHashes(w, chainClient, n.Height) // On out-of-sync blockconnected notifications, only
notificationName = "rescanprogress" // send a debug message.
w.SetChainSynced(true) errStr := "Failed to process consensus server " +
select { "notification (name: `%s`, detail: `%v`)"
case w.rescanNotifications <- n: if notificationName == "blockconnected" &&
case <-w.quitChan(): strings.Contains(err.Error(),
return "couldn't get hash from database") {
} log.Debugf(errStr, notificationName, err)
} } else {
if err != nil { log.Errorf(errStr, notificationName, err)
// On out-of-sync blockconnected notifications, only }
// send a debug message.
errStr := "Failed to process consensus server " +
"notification (name: `%s`, detail: `%v`)"
if notificationName == "blockconnected" &&
strings.Contains(err.Error(),
"couldn't get hash from database") {
log.Debugf(errStr, notificationName, err)
} else {
log.Errorf(errStr, notificationName, err)
} }
case <-w.quit:
return
} }
} }
w.wg.Done()
} }
// connectBlock handles a chain server notification by marking a wallet // connectBlock handles a chain server notification by marking a wallet

View file

@ -173,8 +173,6 @@ func (w *Wallet) SynchronizeRPC(chainClient chain.Interface) {
switch cc := chainClient.(type) { switch cc := chainClient.(type) {
case *chain.NeutrinoClient: case *chain.NeutrinoClient:
cc.SetStartTime(w.Manager.Birthday()) cc.SetStartTime(w.Manager.Birthday())
case *chain.BitcoindClient:
cc.SetStartTime(w.Manager.Birthday())
} }
w.chainClientLock.Unlock() w.chainClientLock.Unlock()