b9fd527d33
This commit is the result of several big changes being made to the wallet. In particular, the "handshake" (initial sync to the chain server) was quite racy and required proper synchronization. To make fixing this race easier, several other changes were made to the internal wallet data structures and much of the RPC server ended up being rewritten. First, all account support has been removed. The previous Account struct has been replaced with a Wallet structure, which includes a keystore for saving keys, and a txstore for storing relevant transactions. This decision has been made since it is the opinion of myself and other developers that bitcoind accounts are fundamentally broken (as accounts implemented by bitcoind support both arbitrary address groupings as well as moving balances between accounts -- these are fundamentally incompatible features), and since a BIP0032 keystore is soon planned to be implemented (at which point, "accounts" can return as HD extended keys). With the keystore handling the grouping of related keys, there is no reason have many different Account structs, and the AccountManager has been removed as well. All RPC handlers that take an account option will only work with "" (the default account) or "*" if the RPC allows specifying all accounts. Second, much of the RPC server has been cleaned up. The global variables for the RPC server and chain server client have been moved to part of the rpcServer struct, and the handlers for each RPC method that are looked up change depending on which components have been set. Passthrough requests are also no longer handled specially, but when the chain server is set, a handler to perform the passthrough will be returned if the method is not otherwise a wallet RPC. The notification system for websocket clients has also been rewritten so wallet components can send notifications through channels, rather than requiring direct access to the RPC server itself, or worse still, sending directly to a websocket client's send channel. In the future, this will enable proper registration of notifications, rather than unsolicited broadcasts to every connected websocket client (see issue #84). Finally, and the main reason why much of this cleanup was necessary, the races during intial sync with the chain server have been fixed. Previously, when the 'Handshake' was run, a rescan would occur which would perform modifications to Account data structures as notifications were received. Synchronization was provided with a single binary semaphore which serialized all access to wallet and account data. However, the Handshake itself was not able to run with this lock (or else notifications would block), and many data races would occur as both notifications were being handled. If GOMAXPROCS was ever increased beyond 1, btcwallet would always immediately crash due to invalid addresses caused by the data races on startup. To fix this, the single lock for all wallet access has been replaced with mutexes for both the keystore and txstore. Handling of btcd notifications and client requests may now occur simultaneously. GOMAXPROCS has also been set to the number of logical CPUs at the beginning of main, since with the data races fixed, there's no reason to prevent the extra parallelism gained by increasing it. Closes #78. Closes #101. Closes #110.
306 lines
7.9 KiB
Go
306 lines
7.9 KiB
Go
/*
|
|
* Copyright (c) 2013, 2014 Conformal Systems LLC <info@conformal.com>
|
|
*
|
|
* Permission to use, copy, modify, and distribute this software for any
|
|
* purpose with or without fee is hereby granted, provided that the above
|
|
* copyright notice and this permission notice appear in all copies.
|
|
*
|
|
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
|
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
|
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
|
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
|
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
|
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
|
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
*/
|
|
|
|
package chain
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/conformal/btcnet"
|
|
"github.com/conformal/btcrpcclient"
|
|
"github.com/conformal/btcutil"
|
|
"github.com/conformal/btcwallet/keystore"
|
|
"github.com/conformal/btcwallet/txstore"
|
|
"github.com/conformal/btcwire"
|
|
"github.com/conformal/btcws"
|
|
)
|
|
|
|
type Client struct {
|
|
*btcrpcclient.Client
|
|
netParams *btcnet.Params
|
|
|
|
enqueueNotification chan interface{}
|
|
dequeueNotification chan interface{}
|
|
currentBlock chan *keystore.BlockStamp
|
|
|
|
quit chan struct{}
|
|
wg sync.WaitGroup
|
|
started bool
|
|
quitMtx sync.Mutex
|
|
}
|
|
|
|
func NewClient(net *btcnet.Params, connect, user, pass string, certs []byte) (*Client, error) {
|
|
client := Client{
|
|
netParams: net,
|
|
enqueueNotification: make(chan interface{}),
|
|
dequeueNotification: make(chan interface{}),
|
|
currentBlock: make(chan *keystore.BlockStamp),
|
|
quit: make(chan struct{}),
|
|
}
|
|
initializedClient := make(chan struct{})
|
|
ntfnCallbacks := btcrpcclient.NotificationHandlers{
|
|
OnClientConnected: func() {
|
|
log.Info("Established connection to btcd")
|
|
},
|
|
OnBlockConnected: client.onBlockConnected,
|
|
OnBlockDisconnected: client.onBlockDisconnected,
|
|
OnRecvTx: client.onRecvTx,
|
|
OnRedeemingTx: client.onRedeemingTx,
|
|
OnRescanFinished: client.onRescanFinished,
|
|
OnRescanProgress: client.onRescanProgress,
|
|
}
|
|
conf := btcrpcclient.ConnConfig{
|
|
Host: connect,
|
|
Endpoint: "ws",
|
|
User: user,
|
|
Pass: pass,
|
|
Certificates: certs,
|
|
DisableConnectOnNew: true,
|
|
}
|
|
c, err := btcrpcclient.New(&conf, &ntfnCallbacks)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
client.Client = c
|
|
close(initializedClient)
|
|
return &client, nil
|
|
}
|
|
|
|
func (c *Client) Start() error {
|
|
err := c.Connect(5) // attempt connection 5 tries at most
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Verify that the server is running on the expected network.
|
|
net, err := c.GetCurrentNet()
|
|
if err != nil {
|
|
c.Disconnect()
|
|
return err
|
|
}
|
|
if net != c.netParams.Net {
|
|
c.Disconnect()
|
|
return errors.New("mismatched networks")
|
|
}
|
|
|
|
c.quitMtx.Lock()
|
|
c.started = true
|
|
c.quitMtx.Unlock()
|
|
|
|
c.wg.Add(1)
|
|
go c.handler()
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) Stop() {
|
|
c.quitMtx.Lock()
|
|
defer c.quitMtx.Unlock()
|
|
|
|
select {
|
|
case <-c.quit:
|
|
default:
|
|
close(c.quit)
|
|
c.Client.Shutdown()
|
|
|
|
if !c.started {
|
|
close(c.dequeueNotification)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Client) WaitForShutdown() {
|
|
c.Client.WaitForShutdown()
|
|
c.wg.Wait()
|
|
}
|
|
|
|
func (c *Client) Notifications() <-chan interface{} {
|
|
return c.dequeueNotification
|
|
}
|
|
|
|
func (c *Client) BlockStamp() (*keystore.BlockStamp, error) {
|
|
select {
|
|
case bs := <-c.currentBlock:
|
|
return bs, nil
|
|
case <-c.quit:
|
|
return nil, errors.New("disconnected")
|
|
}
|
|
}
|
|
|
|
// Notification types. These are defined here and processed from from reading
|
|
// a notificationChan to avoid handling these notifications directly in
|
|
// btcrpcclient callbacks, which isn't very Go-like and doesn't allow
|
|
// blocking client calls.
|
|
type (
|
|
BlockConnected keystore.BlockStamp
|
|
BlockDisconnected keystore.BlockStamp
|
|
RecvTx struct {
|
|
Tx *btcutil.Tx // Index is guaranteed to be set.
|
|
Block *txstore.Block // nil if unmined
|
|
}
|
|
RedeemingTx struct {
|
|
Tx *btcutil.Tx // Index is guaranteed to be set.
|
|
Block *txstore.Block // nil if unmined
|
|
}
|
|
RescanProgress struct {
|
|
Hash *btcwire.ShaHash
|
|
Height int32
|
|
Time time.Time
|
|
}
|
|
RescanFinished struct {
|
|
Hash *btcwire.ShaHash
|
|
Height int32
|
|
Time time.Time
|
|
}
|
|
)
|
|
|
|
// parseBlock parses a btcws definition of the block a tx is mined it to the
|
|
// Block structure of the txstore package, and the block index. This is done
|
|
// here since btcrpcclient doesn't parse this nicely for us.
|
|
func parseBlock(block *btcws.BlockDetails) (blk *txstore.Block, idx int, err error) {
|
|
if block == nil {
|
|
return nil, btcutil.TxIndexUnknown, nil
|
|
}
|
|
blksha, err := btcwire.NewShaHashFromStr(block.Hash)
|
|
if err != nil {
|
|
return nil, btcutil.TxIndexUnknown, err
|
|
}
|
|
blk = &txstore.Block{
|
|
Height: block.Height,
|
|
Hash: *blksha,
|
|
Time: time.Unix(block.Time, 0),
|
|
}
|
|
return blk, block.Index, nil
|
|
}
|
|
|
|
func (c *Client) onBlockConnected(hash *btcwire.ShaHash, height int32) {
|
|
c.enqueueNotification <- BlockConnected{Hash: hash, Height: height}
|
|
}
|
|
|
|
func (c *Client) onBlockDisconnected(hash *btcwire.ShaHash, height int32) {
|
|
c.enqueueNotification <- BlockDisconnected{Hash: hash, Height: height}
|
|
}
|
|
|
|
func (c *Client) onRecvTx(tx *btcutil.Tx, block *btcws.BlockDetails) {
|
|
var blk *txstore.Block
|
|
index := btcutil.TxIndexUnknown
|
|
if block != nil {
|
|
var err error
|
|
blk, index, err = parseBlock(block)
|
|
if err != nil {
|
|
// Log and drop improper notification.
|
|
log.Errorf("recvtx notification bad block: %v", err)
|
|
return
|
|
}
|
|
}
|
|
tx.SetIndex(index)
|
|
c.enqueueNotification <- RecvTx{tx, blk}
|
|
}
|
|
|
|
func (c *Client) onRedeemingTx(tx *btcutil.Tx, block *btcws.BlockDetails) {
|
|
var blk *txstore.Block
|
|
index := btcutil.TxIndexUnknown
|
|
if block != nil {
|
|
var err error
|
|
blk, index, err = parseBlock(block)
|
|
if err != nil {
|
|
// Log and drop improper notification.
|
|
log.Errorf("recvtx notification bad block: %v", err)
|
|
return
|
|
}
|
|
}
|
|
tx.SetIndex(index)
|
|
c.enqueueNotification <- RedeemingTx{tx, blk}
|
|
}
|
|
|
|
func (c *Client) onRescanProgress(hash *btcwire.ShaHash, height int32, blkTime time.Time) {
|
|
c.enqueueNotification <- &RescanProgress{hash, height, blkTime}
|
|
}
|
|
|
|
func (c *Client) onRescanFinished(hash *btcwire.ShaHash, height int32, blkTime time.Time) {
|
|
c.enqueueNotification <- &RescanFinished{hash, height, blkTime}
|
|
}
|
|
|
|
// handler maintains a queue of notifications and the current state (best
|
|
// block) of the chain.
|
|
func (c *Client) handler() {
|
|
hash, height, err := c.GetBestBlock()
|
|
if err != nil {
|
|
close(c.quit)
|
|
c.wg.Done()
|
|
}
|
|
|
|
bs := &keystore.BlockStamp{Hash: hash, Height: height}
|
|
|
|
// TODO: Rather than leaving this as an unbounded queue for all types of
|
|
// notifications, try dropping ones where a later enqueued notification
|
|
// can fully invalidate one waiting to be processed. For example,
|
|
// blockconnected notifications for greater block heights can remove the
|
|
// need to process earlier blockconnected notifications still waiting
|
|
// here.
|
|
|
|
var notifications []interface{}
|
|
enqueue := c.enqueueNotification
|
|
var dequeue chan interface{}
|
|
var next interface{}
|
|
out:
|
|
for {
|
|
select {
|
|
case n, ok := <-enqueue:
|
|
if !ok {
|
|
// If no notifications are queued for handling,
|
|
// the queue is finished.
|
|
if len(notifications) == 0 {
|
|
break out
|
|
}
|
|
// nil channel so no more reads can occur.
|
|
enqueue = nil
|
|
continue
|
|
}
|
|
if len(notifications) == 0 {
|
|
next = n
|
|
dequeue = c.dequeueNotification
|
|
}
|
|
notifications = append(notifications, n)
|
|
|
|
case dequeue <- next:
|
|
if n, ok := next.(BlockConnected); ok {
|
|
bs = (*keystore.BlockStamp)(&n)
|
|
}
|
|
|
|
notifications[0] = nil
|
|
notifications = notifications[1:]
|
|
if len(notifications) != 0 {
|
|
next = notifications[0]
|
|
} else {
|
|
// If no more notifications can be enqueued, the
|
|
// queue is finished.
|
|
if enqueue == nil {
|
|
break out
|
|
}
|
|
dequeue = nil
|
|
}
|
|
|
|
case c.currentBlock <- bs:
|
|
|
|
case <-c.quit:
|
|
break out
|
|
}
|
|
}
|
|
close(c.dequeueNotification)
|
|
c.wg.Done()
|
|
}
|