361 lines
9.7 KiB
Go
361 lines
9.7 KiB
Go
/*
|
|
* Copyright (c) 2013, 2014 Conformal Systems LLC <info@conformal.com>
|
|
*
|
|
* Permission to use, copy, modify, and distribute this software for any
|
|
* purpose with or without fee is hereby granted, provided that the above
|
|
* copyright notice and this permission notice appear in all copies.
|
|
*
|
|
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
|
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
|
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
|
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
|
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
|
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
|
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
*/
|
|
|
|
package chain
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/conformal/btcnet"
|
|
"github.com/conformal/btcrpcclient"
|
|
"github.com/conformal/btcutil"
|
|
"github.com/conformal/btcwallet/keystore"
|
|
"github.com/conformal/btcwallet/txstore"
|
|
"github.com/conformal/btcwire"
|
|
"github.com/conformal/btcws"
|
|
)
|
|
|
|
type Client struct {
|
|
*btcrpcclient.Client
|
|
netParams *btcnet.Params
|
|
|
|
enqueueNotification chan interface{}
|
|
dequeueNotification chan interface{}
|
|
currentBlock chan *keystore.BlockStamp
|
|
|
|
// Notification channels regarding the state of the client. These exist
|
|
// so other components can listen in on chain activity. These are
|
|
// initialized as nil, and must be created by calling one of the Listen*
|
|
// methods.
|
|
connected chan bool
|
|
notificationLock sync.Locker
|
|
|
|
quit chan struct{}
|
|
wg sync.WaitGroup
|
|
started bool
|
|
quitMtx sync.Mutex
|
|
}
|
|
|
|
func NewClient(net *btcnet.Params, connect, user, pass string, certs []byte) (*Client, error) {
|
|
client := Client{
|
|
netParams: net,
|
|
enqueueNotification: make(chan interface{}),
|
|
dequeueNotification: make(chan interface{}),
|
|
currentBlock: make(chan *keystore.BlockStamp),
|
|
notificationLock: new(sync.Mutex),
|
|
quit: make(chan struct{}),
|
|
}
|
|
ntfnCallbacks := btcrpcclient.NotificationHandlers{
|
|
OnClientConnected: client.onClientConnect,
|
|
OnBlockConnected: client.onBlockConnected,
|
|
OnBlockDisconnected: client.onBlockDisconnected,
|
|
OnRecvTx: client.onRecvTx,
|
|
OnRedeemingTx: client.onRedeemingTx,
|
|
OnRescanFinished: client.onRescanFinished,
|
|
OnRescanProgress: client.onRescanProgress,
|
|
}
|
|
conf := btcrpcclient.ConnConfig{
|
|
Host: connect,
|
|
Endpoint: "ws",
|
|
User: user,
|
|
Pass: pass,
|
|
Certificates: certs,
|
|
DisableConnectOnNew: true,
|
|
}
|
|
c, err := btcrpcclient.New(&conf, &ntfnCallbacks)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
client.Client = c
|
|
return &client, nil
|
|
}
|
|
|
|
func (c *Client) Start() error {
|
|
err := c.Connect(5) // attempt connection 5 tries at most
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Verify that the server is running on the expected network.
|
|
net, err := c.GetCurrentNet()
|
|
if err != nil {
|
|
c.Disconnect()
|
|
return err
|
|
}
|
|
if net != c.netParams.Net {
|
|
c.Disconnect()
|
|
return errors.New("mismatched networks")
|
|
}
|
|
|
|
c.quitMtx.Lock()
|
|
c.started = true
|
|
c.quitMtx.Unlock()
|
|
|
|
c.wg.Add(1)
|
|
go c.handler()
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) Stop() {
|
|
c.quitMtx.Lock()
|
|
defer c.quitMtx.Unlock()
|
|
|
|
select {
|
|
case <-c.quit:
|
|
default:
|
|
close(c.quit)
|
|
c.Client.Shutdown()
|
|
|
|
if !c.started {
|
|
close(c.dequeueNotification)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Client) WaitForShutdown() {
|
|
c.Client.WaitForShutdown()
|
|
c.wg.Wait()
|
|
}
|
|
|
|
func (c *Client) Notifications() <-chan interface{} {
|
|
return c.dequeueNotification
|
|
}
|
|
|
|
func (c *Client) BlockStamp() (*keystore.BlockStamp, error) {
|
|
select {
|
|
case bs := <-c.currentBlock:
|
|
return bs, nil
|
|
case <-c.quit:
|
|
return nil, errors.New("disconnected")
|
|
}
|
|
}
|
|
|
|
// Notification types. These are defined here and processed from from reading
|
|
// a notificationChan to avoid handling these notifications directly in
|
|
// btcrpcclient callbacks, which isn't very Go-like and doesn't allow
|
|
// blocking client calls.
|
|
type (
|
|
BlockConnected keystore.BlockStamp
|
|
BlockDisconnected keystore.BlockStamp
|
|
RecvTx struct {
|
|
Tx *btcutil.Tx // Index is guaranteed to be set.
|
|
Block *txstore.Block // nil if unmined
|
|
}
|
|
RedeemingTx struct {
|
|
Tx *btcutil.Tx // Index is guaranteed to be set.
|
|
Block *txstore.Block // nil if unmined
|
|
}
|
|
RescanProgress struct {
|
|
Hash *btcwire.ShaHash
|
|
Height int32
|
|
Time time.Time
|
|
}
|
|
RescanFinished struct {
|
|
Hash *btcwire.ShaHash
|
|
Height int32
|
|
Time time.Time
|
|
}
|
|
)
|
|
|
|
// parseBlock parses a btcws definition of the block a tx is mined it to the
|
|
// Block structure of the txstore package, and the block index. This is done
|
|
// here since btcrpcclient doesn't parse this nicely for us.
|
|
func parseBlock(block *btcws.BlockDetails) (blk *txstore.Block, idx int, err error) {
|
|
if block == nil {
|
|
return nil, btcutil.TxIndexUnknown, nil
|
|
}
|
|
blksha, err := btcwire.NewShaHashFromStr(block.Hash)
|
|
if err != nil {
|
|
return nil, btcutil.TxIndexUnknown, err
|
|
}
|
|
blk = &txstore.Block{
|
|
Height: block.Height,
|
|
Hash: *blksha,
|
|
Time: time.Unix(block.Time, 0),
|
|
}
|
|
return blk, block.Index, nil
|
|
}
|
|
|
|
func (c *Client) onClientConnect() {
|
|
log.Info("Established websocket RPC connection to btcd")
|
|
c.notifyConnected(true)
|
|
}
|
|
|
|
func (c *Client) onBlockConnected(hash *btcwire.ShaHash, height int32) {
|
|
c.enqueueNotification <- BlockConnected{Hash: hash, Height: height}
|
|
}
|
|
|
|
func (c *Client) onBlockDisconnected(hash *btcwire.ShaHash, height int32) {
|
|
c.enqueueNotification <- BlockDisconnected{Hash: hash, Height: height}
|
|
}
|
|
|
|
func (c *Client) onRecvTx(tx *btcutil.Tx, block *btcws.BlockDetails) {
|
|
var blk *txstore.Block
|
|
index := btcutil.TxIndexUnknown
|
|
if block != nil {
|
|
var err error
|
|
blk, index, err = parseBlock(block)
|
|
if err != nil {
|
|
// Log and drop improper notification.
|
|
log.Errorf("recvtx notification bad block: %v", err)
|
|
return
|
|
}
|
|
}
|
|
tx.SetIndex(index)
|
|
c.enqueueNotification <- RecvTx{tx, blk}
|
|
}
|
|
|
|
func (c *Client) onRedeemingTx(tx *btcutil.Tx, block *btcws.BlockDetails) {
|
|
var blk *txstore.Block
|
|
index := btcutil.TxIndexUnknown
|
|
if block != nil {
|
|
var err error
|
|
blk, index, err = parseBlock(block)
|
|
if err != nil {
|
|
// Log and drop improper notification.
|
|
log.Errorf("recvtx notification bad block: %v", err)
|
|
return
|
|
}
|
|
}
|
|
tx.SetIndex(index)
|
|
c.enqueueNotification <- RedeemingTx{tx, blk}
|
|
}
|
|
|
|
func (c *Client) onRescanProgress(hash *btcwire.ShaHash, height int32, blkTime time.Time) {
|
|
c.enqueueNotification <- &RescanProgress{hash, height, blkTime}
|
|
}
|
|
|
|
func (c *Client) onRescanFinished(hash *btcwire.ShaHash, height int32, blkTime time.Time) {
|
|
c.enqueueNotification <- &RescanFinished{hash, height, blkTime}
|
|
}
|
|
|
|
// handler maintains a queue of notifications and the current state (best
|
|
// block) of the chain.
|
|
func (c *Client) handler() {
|
|
hash, height, err := c.GetBestBlock()
|
|
if err != nil {
|
|
close(c.quit)
|
|
c.wg.Done()
|
|
}
|
|
|
|
bs := &keystore.BlockStamp{Hash: hash, Height: height}
|
|
|
|
// TODO: Rather than leaving this as an unbounded queue for all types of
|
|
// notifications, try dropping ones where a later enqueued notification
|
|
// can fully invalidate one waiting to be processed. For example,
|
|
// blockconnected notifications for greater block heights can remove the
|
|
// need to process earlier blockconnected notifications still waiting
|
|
// here.
|
|
|
|
var notifications []interface{}
|
|
enqueue := c.enqueueNotification
|
|
var dequeue chan interface{}
|
|
var next interface{}
|
|
out:
|
|
for {
|
|
select {
|
|
case n, ok := <-enqueue:
|
|
if !ok {
|
|
// If no notifications are queued for handling,
|
|
// the queue is finished.
|
|
if len(notifications) == 0 {
|
|
break out
|
|
}
|
|
// nil channel so no more reads can occur.
|
|
enqueue = nil
|
|
continue
|
|
}
|
|
if len(notifications) == 0 {
|
|
next = n
|
|
dequeue = c.dequeueNotification
|
|
}
|
|
notifications = append(notifications, n)
|
|
|
|
case dequeue <- next:
|
|
if n, ok := next.(BlockConnected); ok {
|
|
bs = (*keystore.BlockStamp)(&n)
|
|
}
|
|
|
|
notifications[0] = nil
|
|
notifications = notifications[1:]
|
|
if len(notifications) != 0 {
|
|
next = notifications[0]
|
|
} else {
|
|
// If no more notifications can be enqueued, the
|
|
// queue is finished.
|
|
if enqueue == nil {
|
|
break out
|
|
}
|
|
dequeue = nil
|
|
}
|
|
|
|
case c.currentBlock <- bs:
|
|
|
|
case <-c.quit:
|
|
break out
|
|
}
|
|
}
|
|
close(c.dequeueNotification)
|
|
c.wg.Done()
|
|
}
|
|
|
|
// ErrDuplicateListen is returned for any attempts to listen for the same
|
|
// notification more than once. If callers must pass along a notifiation to
|
|
// multiple places, they must broadcast it themself.
|
|
var ErrDuplicateListen = errors.New("duplicate listen")
|
|
|
|
type noopLocker struct{}
|
|
|
|
func (noopLocker) Lock() {}
|
|
func (noopLocker) Unlock() {}
|
|
|
|
// ListenConnected returns a channel that passes the current connection state
|
|
// of the client. This will be automatically sent to when the client is first
|
|
// connected, as well as the current state whenever NotifyConnected is
|
|
// forcibly called.
|
|
//
|
|
// If this is called twice, ErrDuplicateListen is returned.
|
|
func (c *Client) ListenConnected() (<-chan bool, error) {
|
|
c.notificationLock.Lock()
|
|
defer c.notificationLock.Unlock()
|
|
|
|
if c.connected != nil {
|
|
return nil, ErrDuplicateListen
|
|
}
|
|
c.connected = make(chan bool)
|
|
c.notificationLock = noopLocker{}
|
|
return c.connected, nil
|
|
}
|
|
|
|
func (c *Client) notifyConnected(connected bool) {
|
|
c.notificationLock.Lock()
|
|
if c.connected != nil {
|
|
c.connected <- connected
|
|
}
|
|
c.notificationLock.Unlock()
|
|
}
|
|
|
|
// NotifyConnected sends the channel notification for a connected or
|
|
// disconnected client. This is exported so it can be called by other
|
|
// packages which require notifying the current connection state.
|
|
//
|
|
// TODO: This shouldn't exist, but the current notification API requires it.
|
|
func (c *Client) NotifyConnected() {
|
|
connected := !c.Client.Disconnected()
|
|
c.notifyConnected(connected)
|
|
}
|