Move spvchain into neutrino and start integration w/btcwallet

This commit is contained in:
Alex 2017-05-19 17:45:38 -06:00 committed by Olaoluwa Osuntokun
parent 32adc3c43f
commit 3d81f856fd
19 changed files with 431 additions and 6592 deletions

View file

@ -1,10 +1,13 @@
package chain
import (
"time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/waddrmgr"
"github.com/btcsuite/btcwallet/wtxmgr"
)
// Interface allows more than one backing blockchain source, such as a
@ -23,3 +26,44 @@ type Interface interface {
NotifyBlocks() error
Notifications() <-chan interface{}
}
// Notification types. These are defined here and processed from from reading
// a notificationChan to avoid handling these notifications directly in
// btcrpcclient callbacks, which isn't very Go-like and doesn't allow
// blocking client calls.
type (
// ClientConnected is a notification for when a client connection is
// opened or reestablished to the chain server.
ClientConnected struct{}
// BlockConnected is a notification for a newly-attached block to the
// best chain.
BlockConnected wtxmgr.BlockMeta
// BlockDisconnected is a notifcation that the block described by the
// BlockStamp was reorganized out of the best chain.
BlockDisconnected wtxmgr.BlockMeta
// RelevantTx is a notification for a transaction which spends wallet
// inputs or pays to a watched address.
RelevantTx struct {
TxRecord *wtxmgr.TxRecord
Block *wtxmgr.BlockMeta // nil if unmined
}
// RescanProgress is a notification describing the current status
// of an in-progress rescan.
RescanProgress struct {
Hash *chainhash.Hash
Height int32
Time time.Time
}
// RescanFinished is a notification that a previous rescan request
// has finished.
RescanFinished struct {
Hash *chainhash.Hash
Height int32
Time time.Time
}
)

312
chain/neutrino.go Normal file
View file

@ -0,0 +1,312 @@
package chain
import (
"errors"
"fmt"
"sync"
"time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcrpcclient"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/waddrmgr"
"github.com/btcsuite/btcwallet/wtxmgr"
"github.com/lightninglabs/neutrino"
)
// SPVChain is an implementation of the btcwalet chain.Interface interface.
type SPVChain struct {
cs *neutrino.ChainService
// We currently support one rescan/notifiction goroutine per client
rescan *neutrino.Rescan
enqueueNotification chan interface{}
dequeueNotification chan interface{}
currentBlock chan *waddrmgr.BlockStamp
quit chan struct{}
rescanQuit chan struct{}
wg sync.WaitGroup
started bool
scanning bool
clientMtx sync.Mutex
}
// NewSPVChain creates a new SPVChain struct with a backing ChainService
func NewSPVChain(chainService *neutrino.ChainService) *SPVChain {
return &SPVChain{cs: chainService}
}
// Start replicates the RPC client's Start method.
func (s *SPVChain) Start() error {
s.cs.Start()
s.clientMtx.Lock()
defer s.clientMtx.Unlock()
if !s.started {
s.enqueueNotification = make(chan interface{})
s.dequeueNotification = make(chan interface{})
s.currentBlock = make(chan *waddrmgr.BlockStamp)
s.quit = make(chan struct{})
s.started = true
s.wg.Add(1)
go s.notificationHandler()
}
return nil
}
// Stop replicates the RPC client's Stop method.
func (s *SPVChain) Stop() {
s.clientMtx.Lock()
defer s.clientMtx.Unlock()
if !s.started {
return
}
close(s.quit)
s.started = false
}
// WaitForShutdown replicates the RPC client's WaitForShutdown method.
func (s *SPVChain) WaitForShutdown() {
s.wg.Wait()
}
// GetBlock replicates the RPC client's GetBlock command.
func (s *SPVChain) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) {
// TODO(roasbeef): add a block cache?
// * which evication strategy? depends on use case
block, err := s.cs.GetBlockFromNetwork(*hash)
if err != nil {
return nil, err
}
return block.MsgBlock(), nil
}
// GetBestBlock replicates the RPC client's GetBestBlock command.
func (s *SPVChain) GetBestBlock() (*chainhash.Hash, int32, error) {
header, height, err := s.cs.LatestBlock()
if err != nil {
return nil, 0, err
}
hash := header.BlockHash()
return &hash, int32(height), nil
}
// BlockStamp returns the latest block notified by the client, or an error
// if the client has been shut down.
func (s *SPVChain) BlockStamp() (*waddrmgr.BlockStamp, error) {
select {
case bs := <-s.currentBlock:
return bs, nil
case <-s.quit:
return nil, errors.New("disconnected")
}
}
// SendRawTransaction replicates the RPC client's SendRawTransaction command.
func (s *SPVChain) SendRawTransaction(tx *wire.MsgTx, allowHighFees bool) (
*chainhash.Hash, error) {
err := s.cs.SendTransaction(tx)
if err != nil {
return nil, err
}
hash := tx.TxHash()
return &hash, nil
}
// Rescan replicates the RPC client's Rescan command.
func (s *SPVChain) Rescan(startHash *chainhash.Hash, addrs []btcutil.Address,
outPoints []*wire.OutPoint) error {
s.clientMtx.Lock()
if !s.started {
s.clientMtx.Unlock()
return fmt.Errorf("can't do a rescan when the chain client " +
"is not started")
}
if s.scanning {
// Restart the rescan by killing the existing rescan.
close(s.rescanQuit)
}
s.rescanQuit = make(chan struct{})
s.scanning = true
s.clientMtx.Unlock()
return s.cs.Rescan(
neutrino.NotificationHandlers(btcrpcclient.NotificationHandlers{
OnFilteredBlockConnected: s.onFilteredBlockConnected,
OnBlockDisconnected: s.onBlockDisconnected,
}),
neutrino.QuitChan(s.rescanQuit),
)
}
// NotifyBlocks replicates the RPC client's NotifyBlocks command.
func (s *SPVChain) NotifyBlocks() error {
return nil
}
// NotifyReceived replicates the RPC client's NotifyReceived command.
func (s *SPVChain) NotifyReceived() error {
return nil
}
// Notifications replicates the RPC client's Notifications method.
func (s *SPVChain) Notifications() <-chan interface{} {
return s.dequeueNotification
}
// onFilteredBlockConnected sends appropriate notifications to the notification
// channel.
func (s *SPVChain) onFilteredBlockConnected(height int32,
header *wire.BlockHeader, relevantTxs []*btcutil.Tx) {
blockMeta := wtxmgr.BlockMeta{
Block: wtxmgr.Block{
Hash: header.BlockHash(),
Height: height,
},
Time: header.Timestamp,
}
select {
case s.enqueueNotification <- BlockConnected(blockMeta):
case <-s.quit:
return
case <-s.rescanQuit:
return
}
for _, tx := range relevantTxs {
rec, err := wtxmgr.NewTxRecordFromMsgTx(tx.MsgTx(),
blockMeta.Time)
if err != nil {
log.Errorf("Cannot create transaction record for "+
"relevant tx: %s", err)
// TODO(aakselrod): Continue?
return
}
select {
case s.enqueueNotification <- RelevantTx{
TxRecord: rec,
Block: &blockMeta,
}:
case <-s.quit:
return
case <-s.rescanQuit:
return
}
}
bs, err := s.cs.SyncedTo()
if err != nil {
log.Errorf("Can't get chain service's best block: %s", err)
return
}
if bs.Hash == header.BlockHash() {
select {
case s.enqueueNotification <- RescanFinished{
Hash: &bs.Hash,
Height: bs.Height,
Time: header.Timestamp,
}:
case <-s.quit:
return
case <-s.rescanQuit:
return
}
}
}
// onBlockDisconnected sends appropriate notifications to the notification
// channel.
func (s *SPVChain) onBlockDisconnected(hash *chainhash.Hash, height int32,
t time.Time) {
select {
case s.enqueueNotification <- BlockDisconnected{
Block: wtxmgr.Block{
Hash: *hash,
Height: height,
},
Time: t,
}:
case <-s.quit:
case <-s.rescanQuit:
}
}
// notificationHandler queues and dequeues notifications. There are currently
// no bounds on the queue, so the dequeue channel should be read continually to
// avoid running out of memory.
func (s *SPVChain) notificationHandler() {
hash, height, err := s.GetBestBlock()
if err != nil {
log.Errorf("Failed to get best block from chain service: %s",
err)
s.Stop()
s.wg.Done()
return
}
bs := &waddrmgr.BlockStamp{Hash: *hash, Height: height}
// TODO: Rather than leaving this as an unbounded queue for all types of
// notifications, try dropping ones where a later enqueued notification
// can fully invalidate one waiting to be processed. For example,
// blockconnected notifications for greater block heights can remove the
// need to process earlier blockconnected notifications still waiting
// here.
var notifications []interface{}
enqueue := s.enqueueNotification
var dequeue chan interface{}
var next interface{}
out:
for {
select {
case n, ok := <-enqueue:
if !ok {
// If no notifications are queued for handling,
// the queue is finished.
if len(notifications) == 0 {
break out
}
// nil channel so no more reads can occur.
enqueue = nil
continue
}
if len(notifications) == 0 {
next = n
dequeue = s.dequeueNotification
}
notifications = append(notifications, n)
case dequeue <- next:
if n, ok := next.(BlockConnected); ok {
bs = &waddrmgr.BlockStamp{
Height: n.Height,
Hash: n.Hash,
}
}
notifications[0] = nil
notifications = notifications[1:]
if len(notifications) != 0 {
next = notifications[0]
} else {
// If no more notifications can be enqueued, the
// queue is finished.
if enqueue == nil {
break out
}
dequeue = nil
}
case s.currentBlock <- bs:
case <-s.quit:
break out
}
}
s.Stop()
close(s.dequeueNotification)
s.wg.Done()
}

View file

@ -138,47 +138,6 @@ func (c *RPCClient) WaitForShutdown() {
c.wg.Wait()
}
// Notification types. These are defined here and processed from from reading
// a notificationChan to avoid handling these notifications directly in
// btcrpcclient callbacks, which isn't very Go-like and doesn't allow
// blocking client calls.
type (
// ClientConnected is a notification for when a client connection is
// opened or reestablished to the chain server.
ClientConnected struct{}
// BlockConnected is a notification for a newly-attached block to the
// best chain.
BlockConnected wtxmgr.BlockMeta
// BlockDisconnected is a notifcation that the block described by the
// BlockStamp was reorganized out of the best chain.
BlockDisconnected wtxmgr.BlockMeta
// RelevantTx is a notification for a transaction which spends wallet
// inputs or pays to a watched address.
RelevantTx struct {
TxRecord *wtxmgr.TxRecord
Block *wtxmgr.BlockMeta // nil if unmined
}
// RescanProgress is a notification describing the current status
// of an in-progress rescan.
RescanProgress struct {
Hash *chainhash.Hash
Height int32
Time time.Time
}
// RescanFinished is a notification that a previous rescan request
// has finished.
RescanFinished struct {
Hash *chainhash.Hash
Height int32
Time time.Time
}
)
// Notifications returns a channel of parsed notifications sent by the remote
// bitcoin RPC server. This channel must be continually read or the process
// may abort for running out memory, as unread notifications are queued for

View file

@ -19,9 +19,9 @@ import (
"github.com/btcsuite/btcwallet/internal/cfgutil"
"github.com/btcsuite/btcwallet/internal/legacy/keystore"
"github.com/btcsuite/btcwallet/netparams"
"github.com/btcsuite/btcwallet/spvsvc/spvchain"
"github.com/btcsuite/btcwallet/wallet"
flags "github.com/jessevdk/go-flags"
"github.com/lightninglabs/neutrino"
)
const (
@ -270,9 +270,9 @@ func loadConfig() (*config, []string, error) {
UseSPV: false,
AddPeers: []string{},
ConnectPeers: []string{},
MaxPeers: spvchain.MaxPeers,
BanDuration: spvchain.BanDuration,
BanThreshold: spvchain.BanThreshold,
MaxPeers: neutrino.MaxPeers,
BanDuration: neutrino.BanDuration,
BanThreshold: neutrino.BanThreshold,
}
// Pre-parse the command line options to see if an alternative config

64
glide.lock generated
View file

@ -1,28 +1,55 @@
<<<<<<< HEAD
hash: 2fe59efc96b0a2839297653da88cde89208f8f8cf4ced2bb1e828def57e3611b
updated: 2017-07-19T11:33:58.0769452-04:00
=======
hash: a567152c861b11d05c72b812d894a657b95ae39d08f85c0b7cbf1a235dd8a1a7
updated: 2017-05-18T20:28:25.480256291-06:00
>>>>>>> 4d479d4... Move spvchain into neutrino and start integration w/btcwallet
imports:
- name: github.com/aead/siphash
version: e404fcfc888570cadd1610538e2dbc89f66af814
- name: github.com/boltdb/bolt
version: 583e8937c61f1af6513608ccc75c97b6abdf4ff9
- name: github.com/btcsuite/btcd
<<<<<<< HEAD
version: 47885ab8702485be6b6f87a03d4f3be0bc5c982c
=======
version: 58668c182103a00c5038c7130d7a23fad3a3bd34
repo: git@github.com:companyzero/btcdln
>>>>>>> 4d479d4... Move spvchain into neutrino and start integration w/btcwallet
subpackages:
- addrmgr
- blockchain
- btcec
- btcjson
- chaincfg
- chaincfg/chainhash
- connmgr
- database
- peer
- txscript
- wire
- name: github.com/btcsuite/btclog
version: 84c8d2346e9fc8c7b947e243b9c24e6df9fd206a
- name: github.com/btcsuite/btcrpcclient
<<<<<<< HEAD
version: c72658166ae09457e6beb14e9112241e352ebd35
- name: github.com/btcsuite/btcutil
version: 5ffa719c3882fd2ec1e8b9f4978066701c31a343
=======
version: e15bd09b466511d8836c4017bc4cbc2e0ff05f82
repo: git@github.com:companyzero/btcrpcclientln
- name: github.com/btcsuite/btcutil
version: f814b35f15362b1122f55bd9034b6275552cc2f7
repo: git@github.com:companyzero/btcutilln
>>>>>>> 4d479d4... Move spvchain into neutrino and start integration w/btcwallet
subpackages:
- base58
- gcs
- gcs/builder
- hdkeychain
- name: github.com/btcsuite/fastsha256
version: 637e656429416087660c84436a2a035d69d54e2e
- name: github.com/btcsuite/go-socks
version: 4720035b7bfd2a9bb130b1c184f8bbe41b6f0d0f
subpackages:
@ -37,8 +64,17 @@ imports:
- salsa20/salsa
- scrypt
- ssh/terminal
<<<<<<< HEAD
=======
- name: github.com/btcsuite/seelog
version: ae8891d029dd3c269dcfd6f261ad23e761acd99f
>>>>>>> 4d479d4... Move spvchain into neutrino and start integration w/btcwallet
- name: github.com/btcsuite/websocket
version: 31079b6807923eb23992c421b114992b95131b55
- name: github.com/davecgh/go-spew
version: 346938d642f2ec3594ed81d874461961cd0faa76
subpackages:
- spew
- name: github.com/golang/protobuf
version: fec3b39b059c0f88fa6b20f5ed012b1aa203a8b4
subpackages:
@ -46,6 +82,7 @@ imports:
- ptypes/any
- name: github.com/jessevdk/go-flags
version: 1679536dcc895411a9f5848d9a0250be7856448c
<<<<<<< HEAD
- name: github.com/jrick/logrotate
version: a93b200c26cbae3bb09dd0dc2c7c7fe1468a034a
subpackages:
@ -56,6 +93,19 @@ imports:
- ripemd160
- name: golang.org/x/net
version: 8663ed5da4fd087c3cfb99a996e628b72e2f0948
=======
- name: github.com/kkdai/bstream
version: f391b8402d23024e7c0f624b31267a89998fca95
- name: github.com/lightninglabs/neutrino
version: 3a3b87d375c492f22de128f4f5df889e0340bd35
repo: git@github.com:lightninglabs/neutrino
- name: golang.org/x/crypto
version: 0fe963104e9d1877082f8fb38f816fcd97eb1d10
subpackages:
- ssh/terminal
- name: golang.org/x/net
version: 513929065c19401a1c7b76ecd942f9f86a0c061b
>>>>>>> 4d479d4... Move spvchain into neutrino and start integration w/btcwallet
subpackages:
- context
- http2
@ -65,11 +115,19 @@ imports:
- lex/httplex
- trace
- name: golang.org/x/sys
<<<<<<< HEAD
version: cd2c276457edda6df7fb04895d3fd6a6add42926
subpackages:
- unix
- name: golang.org/x/text
version: 6353ef0f924300eea566d3438817aa4d3374817e
=======
version: e62c3de784db939836898e5c19ffd41bece347da
subpackages:
- unix
- name: golang.org/x/text
version: 19e51611da83d6be54ddafce4a4af510cb3e9ea4
>>>>>>> 4d479d4... Move spvchain into neutrino and start integration w/btcwallet
subpackages:
- secure/bidirule
- transform
@ -95,8 +153,4 @@ imports:
- status
- tap
- transport
testImports:
- name: github.com/davecgh/go-spew
version: 346938d642f2ec3594ed81d874461961cd0faa76
subpackages:
- spew
testImports: []

View file

@ -3,6 +3,8 @@ import:
- package: github.com/boltdb/bolt
version: ^1.3.0
- package: github.com/btcsuite/btcd
repo: git@github.com:companyzero/btcdln
version: segwit-cbf
subpackages:
- blockchain
- btcec
@ -13,9 +15,15 @@ import:
- wire
- package: github.com/btcsuite/btclog
- package: github.com/btcsuite/btcrpcclient
repo: git@github.com:companyzero/btcrpcclientln
version: pedro_cbf
- package: github.com/btcsuite/btcutil
repo: git@github.com:companyzero/btcutilln
version: gcs
subpackages:
- hdkeychain
- package: github.com/lightninglabs/neutrino
repo: git@github.com:lightninglabs/neutrino
- package: github.com/btcsuite/golangcrypto
subpackages:
- nacl/secretbox
@ -42,6 +50,5 @@ import:
- rotator
testImport:
- package: github.com/davecgh/go-spew
version: ^1.1.0
subpackages:
- spew

9
log.go
View file

@ -13,7 +13,6 @@ import (
"github.com/btcsuite/btcwallet/chain"
"github.com/btcsuite/btcwallet/rpc/legacyrpc"
"github.com/btcsuite/btcwallet/rpc/rpcserver"
"github.com/btcsuite/btcwallet/spvsvc/spvchain"
"github.com/btcsuite/btcwallet/wallet"
"github.com/btcsuite/btcwallet/wtxmgr"
"github.com/jrick/logrotate/rotator"
@ -73,7 +72,7 @@ var subsystemLoggers = map[string]btclog.Logger{
"CHNS": chainLog,
"GRPC": grpcLog,
"RPCS": legacyRPCLog,
"SPVC": spvchainLog,
"BTCN": btcnLog,
}
// initLogRotator initializes the logging rotater to write logs to logFile and
@ -129,9 +128,9 @@ func useLogger(subsystemID string, logger btclog.Logger) {
case "RPCS":
legacyRPCLog = logger
legacyrpc.UseLogger(logger)
case "SPVC":
spvchainLog = logger
spvchain.UseLogger(logger)
case "BTCN":
btcnLog = logger
neutrino.UseLogger(logger)
}
r, err := rotator.New(logFile, 10*1024, false, 3)
if err != nil {

View file

@ -1,26 +0,0 @@
package spvsvc
import "github.com/btcsuite/btclog"
// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log btclog.Logger
// The default amount of logging is none.
func init() {
DisableLog()
}
// DisableLog disables all library log output. Logging output is disabled
// by default until either UseLogger or SetLogWriter are called.
func DisableLog() {
log = btclog.Disabled
}
// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}

View file

@ -1,76 +0,0 @@
package spvchain
import (
"sync"
"time"
"github.com/btcsuite/btclog"
"github.com/btcsuite/btcutil"
)
// blockProgressLogger provides periodic logging for other services in order
// to show users progress of certain "actions" involving some or all current
// blocks. Ex: syncing to best chain, indexing all blocks, etc.
type blockProgressLogger struct {
receivedLogBlocks int64
receivedLogTx int64
lastBlockLogTime time.Time
subsystemLogger btclog.Logger
progressAction string
sync.Mutex
}
// newBlockProgressLogger returns a new block progress logger.
// The progress message is templated as follows:
// {progressAction} {numProcessed} {blocks|block} in the last {timePeriod}
// ({numTxs}, height {lastBlockHeight}, {lastBlockTimeStamp})
func newBlockProgressLogger(progressMessage string, logger btclog.Logger) *blockProgressLogger {
return &blockProgressLogger{
lastBlockLogTime: time.Now(),
progressAction: progressMessage,
subsystemLogger: logger,
}
}
// LogBlockHeight logs a new block height as an information message to show
// progress to the user. In order to prevent spam, it limits logging to one
// message every 10 seconds with duration and totals included.
func (b *blockProgressLogger) LogBlockHeight(block *btcutil.Block) {
b.Lock()
defer b.Unlock()
b.receivedLogBlocks++
b.receivedLogTx += int64(len(block.MsgBlock().Transactions))
now := time.Now()
duration := now.Sub(b.lastBlockLogTime)
if duration < time.Second*10 {
return
}
// Truncate the duration to 10s of milliseconds.
durationMillis := int64(duration / time.Millisecond)
tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10)
// Log information about new block height.
blockStr := "blocks"
if b.receivedLogBlocks == 1 {
blockStr = "block"
}
txStr := "transactions"
if b.receivedLogTx == 1 {
txStr = "transaction"
}
b.subsystemLogger.Infof("%s %d %s in the last %s (%d %s, height %d, %s)",
b.progressAction, b.receivedLogBlocks, blockStr, tDuration, b.receivedLogTx,
txStr, block.Height(), block.MsgBlock().Header.Timestamp)
b.receivedLogBlocks = 0
b.receivedLogTx = 0
b.lastBlockLogTime = now
}
func (b *blockProgressLogger) SetLastLogTime(time time.Time) {
b.lastBlockLogTime = time
}

File diff suppressed because it is too large Load diff

View file

@ -1,761 +0,0 @@
// NOTE: THIS API IS UNSTABLE RIGHT NOW.
package spvchain
import (
"bytes"
"encoding/binary"
"fmt"
"time"
"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil/gcs"
"github.com/btcsuite/btcutil/gcs/builder"
"github.com/btcsuite/btcwallet/waddrmgr"
"github.com/btcsuite/btcwallet/walletdb"
)
const (
// LatestDBVersion is the most recent database version.
LatestDBVersion = 1
)
var (
// latestDBVersion is the most recent database version as a variable so
// the tests can change it to force errors.
latestDBVersion uint32 = LatestDBVersion
)
// Key names for various database fields.
var (
// Bucket names.
spvBucketName = []byte("spvchain")
blockHeaderBucketName = []byte("bh")
basicHeaderBucketName = []byte("bfh")
basicFilterBucketName = []byte("bf")
extHeaderBucketName = []byte("efh")
extFilterBucketName = []byte("ef")
// Db related key names (main bucket).
dbVersionName = []byte("dbver")
dbCreateDateName = []byte("dbcreated")
maxBlockHeightName = []byte("maxblockheight")
)
// uint32ToBytes converts a 32 bit unsigned integer into a 4-byte slice in
// little-endian order: 1 -> [1 0 0 0].
func uint32ToBytes(number uint32) []byte {
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, number)
return buf
}
// uint64ToBytes converts a 64 bit unsigned integer into a 8-byte slice in
// little-endian order: 1 -> [1 0 0 0 0 0 0 0].
func uint64ToBytes(number uint64) []byte {
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, number)
return buf
}
// dbUpdateOption is a function type for the kind of DB update to be done.
// These can call each other and dbViewOption functions; however, they cannot
// be called by dbViewOption functions.
type dbUpdateOption func(bucket walletdb.ReadWriteBucket) error
// dbViewOption is a funciton type for the kind of data to be fetched from DB.
// These can call each other and can be called by dbUpdateOption functions;
// however, they cannot call dbUpdateOption functions.
type dbViewOption func(bucket walletdb.ReadBucket) error
// fetchDBVersion fetches the current manager version from the database.
func (s *ChainService) fetchDBVersion() (uint32, error) {
var version uint32
err := s.dbView(fetchDBVersion(&version))
return version, err
}
func fetchDBVersion(version *uint32) dbViewOption {
return func(bucket walletdb.ReadBucket) error {
verBytes := bucket.Get(dbVersionName)
if verBytes == nil {
return fmt.Errorf("required version number not " +
"stored in database")
}
*version = binary.LittleEndian.Uint32(verBytes)
return nil
}
}
// putDBVersion stores the provided version to the database.
func (s *ChainService) putDBVersion(version uint32) error {
return s.dbUpdate(putDBVersion(version))
}
func putDBVersion(version uint32) dbUpdateOption {
return func(bucket walletdb.ReadWriteBucket) error {
verBytes := uint32ToBytes(version)
return bucket.Put(dbVersionName, verBytes)
}
}
// putMaxBlockHeight stores the max block height to the database.
func (s *ChainService) putMaxBlockHeight(maxBlockHeight uint32) error {
return s.dbUpdate(putMaxBlockHeight(maxBlockHeight))
}
func putMaxBlockHeight(maxBlockHeight uint32) dbUpdateOption {
return func(bucket walletdb.ReadWriteBucket) error {
maxBlockHeightBytes := uint32ToBytes(maxBlockHeight)
err := bucket.Put(maxBlockHeightName, maxBlockHeightBytes)
if err != nil {
return fmt.Errorf("failed to store max block height: %s", err)
}
return nil
}
}
// putBlock stores the provided block header and height, keyed to the block
// hash, in the database.
func (s *ChainService) putBlock(header wire.BlockHeader, height uint32) error {
return s.dbUpdate(putBlock(header, height))
}
func putBlock(header wire.BlockHeader, height uint32) dbUpdateOption {
return func(bucket walletdb.ReadWriteBucket) error {
var buf bytes.Buffer
err := header.Serialize(&buf)
if err != nil {
return err
}
_, err = buf.Write(uint32ToBytes(height))
if err != nil {
return err
}
blockHash := header.BlockHash()
bhBucket := bucket.NestedReadWriteBucket(blockHeaderBucketName)
err = bhBucket.Put(blockHash[:], buf.Bytes())
if err != nil {
return fmt.Errorf("failed to store SPV block info: %s",
err)
}
err = bhBucket.Put(uint32ToBytes(height), blockHash[:])
if err != nil {
return fmt.Errorf("failed to store block height info:"+
" %s", err)
}
return nil
}
}
// putFilter stores the provided filter, keyed to the block hash, in the
// appropriate filter bucket in the database.
func (s *ChainService) putFilter(blockHash chainhash.Hash, bucketName []byte,
filter *gcs.Filter) error {
return s.dbUpdate(putFilter(blockHash, bucketName, filter))
}
func putFilter(blockHash chainhash.Hash, bucketName []byte,
filter *gcs.Filter) dbUpdateOption {
return func(bucket walletdb.ReadWriteBucket) error {
var buf bytes.Buffer
_, err := buf.Write(filter.NBytes())
if err != nil {
return err
}
filterBucket := bucket.NestedReadWriteBucket(bucketName)
err = filterBucket.Put(blockHash[:], buf.Bytes())
if err != nil {
return fmt.Errorf("failed to store filter: %s", err)
}
return nil
}
}
// putBasicFilter stores the provided filter, keyed to the block hash, in the
// basic filter bucket in the database.
func (s *ChainService) putBasicFilter(blockHash chainhash.Hash,
filter *gcs.Filter) error {
return s.dbUpdate(putBasicFilter(blockHash, filter))
}
func putBasicFilter(blockHash chainhash.Hash,
filter *gcs.Filter) dbUpdateOption {
return putFilter(blockHash, basicFilterBucketName, filter)
}
// putExtFilter stores the provided filter, keyed to the block hash, in the
// extended filter bucket in the database.
func (s *ChainService) putExtFilter(blockHash chainhash.Hash,
filter *gcs.Filter) error {
return s.dbUpdate(putExtFilter(blockHash, filter))
}
func putExtFilter(blockHash chainhash.Hash,
filter *gcs.Filter) dbUpdateOption {
return putFilter(blockHash, extFilterBucketName, filter)
}
// putHeader stores the provided header, keyed to the block hash, in the
// appropriate filter header bucket in the database.
func (s *ChainService) putHeader(blockHash chainhash.Hash, bucketName []byte,
filterTip chainhash.Hash) error {
return s.dbUpdate(putHeader(blockHash, bucketName, filterTip))
}
func putHeader(blockHash chainhash.Hash, bucketName []byte,
filterTip chainhash.Hash) dbUpdateOption {
return func(bucket walletdb.ReadWriteBucket) error {
headerBucket := bucket.NestedReadWriteBucket(bucketName)
err := headerBucket.Put(blockHash[:], filterTip[:])
if err != nil {
return fmt.Errorf("failed to store filter header: %s", err)
}
return nil
}
}
// putBasicHeader stores the provided header, keyed to the block hash, in the
// basic filter header bucket in the database.
func (s *ChainService) putBasicHeader(blockHash chainhash.Hash,
filterTip chainhash.Hash) error {
return s.dbUpdate(putBasicHeader(blockHash, filterTip))
}
func putBasicHeader(blockHash chainhash.Hash,
filterTip chainhash.Hash) dbUpdateOption {
return putHeader(blockHash, basicHeaderBucketName, filterTip)
}
// putExtHeader stores the provided header, keyed to the block hash, in the
// extended filter header bucket in the database.
func (s *ChainService) putExtHeader(blockHash chainhash.Hash,
filterTip chainhash.Hash) error {
return s.dbUpdate(putExtHeader(blockHash, filterTip))
}
func putExtHeader(blockHash chainhash.Hash,
filterTip chainhash.Hash) dbUpdateOption {
return putHeader(blockHash, extHeaderBucketName, filterTip)
}
// getFilter retreives the filter, keyed to the provided block hash, from the
// appropriate filter bucket in the database.
func (s *ChainService) getFilter(blockHash chainhash.Hash,
bucketName []byte) (*gcs.Filter, error) {
var filter gcs.Filter
err := s.dbView(getFilter(blockHash, bucketName, &filter))
return &filter, err
}
func getFilter(blockHash chainhash.Hash, bucketName []byte,
filter *gcs.Filter) dbViewOption {
return func(bucket walletdb.ReadBucket) error {
filterBucket := bucket.NestedReadBucket(bucketName)
filterBytes := filterBucket.Get(blockHash[:])
if len(filterBytes) == 0 {
return fmt.Errorf("failed to get filter")
}
calcFilter, err := gcs.FromNBytes(builder.DefaultP, filterBytes)
if calcFilter != nil {
*filter = *calcFilter
}
return err
}
}
// GetBasicFilter retrieves the filter, keyed to the provided block hash, from
// the basic filter bucket in the database.
func (s *ChainService) GetBasicFilter(blockHash chainhash.Hash) (*gcs.Filter,
error) {
var filter gcs.Filter
err := s.dbView(getBasicFilter(blockHash, &filter))
return &filter, err
}
func getBasicFilter(blockHash chainhash.Hash, filter *gcs.Filter) dbViewOption {
return getFilter(blockHash, basicFilterBucketName, filter)
}
// GetExtFilter retrieves the filter, keyed to the provided block hash, from
// the extended filter bucket in the database.
func (s *ChainService) GetExtFilter(blockHash chainhash.Hash) (*gcs.Filter,
error) {
var filter gcs.Filter
err := s.dbView(getExtFilter(blockHash, &filter))
return &filter, err
}
func getExtFilter(blockHash chainhash.Hash, filter *gcs.Filter) dbViewOption {
return getFilter(blockHash, extFilterBucketName, filter)
}
// getHeader retrieves the header, keyed to the provided block hash, from the
// appropriate filter header bucket in the database.
func (s *ChainService) getHeader(blockHash chainhash.Hash,
bucketName []byte) (*chainhash.Hash, error) {
var filterTip chainhash.Hash
err := s.dbView(getHeader(blockHash, bucketName, &filterTip))
return &filterTip, err
}
func getHeader(blockHash chainhash.Hash, bucketName []byte,
filterTip *chainhash.Hash) dbViewOption {
return func(bucket walletdb.ReadBucket) error {
headerBucket := bucket.NestedReadBucket(bucketName)
headerBytes := headerBucket.Get(blockHash[:])
if len(filterTip) == 0 {
return fmt.Errorf("failed to get filter header")
}
calcFilterTip, err := chainhash.NewHash(headerBytes)
if calcFilterTip != nil {
*filterTip = *calcFilterTip
}
return err
}
}
// GetBasicHeader retrieves the header, keyed to the provided block hash, from
// the basic filter header bucket in the database.
func (s *ChainService) GetBasicHeader(blockHash chainhash.Hash) (
*chainhash.Hash, error) {
var filterTip chainhash.Hash
err := s.dbView(getBasicHeader(blockHash, &filterTip))
return &filterTip, err
}
func getBasicHeader(blockHash chainhash.Hash,
filterTip *chainhash.Hash) dbViewOption {
return getHeader(blockHash, basicHeaderBucketName, filterTip)
}
// GetExtHeader retrieves the header, keyed to the provided block hash, from the
// extended filter header bucket in the database.
func (s *ChainService) GetExtHeader(blockHash chainhash.Hash) (*chainhash.Hash,
error) {
var filterTip chainhash.Hash
err := s.dbView(getExtHeader(blockHash, &filterTip))
return &filterTip, err
}
func getExtHeader(blockHash chainhash.Hash,
filterTip *chainhash.Hash) dbViewOption {
return getHeader(blockHash, extHeaderBucketName, filterTip)
}
// rollBackLastBlock rolls back the last known block and returns the BlockStamp
// representing the new last known block.
func (s *ChainService) rollBackLastBlock() (*waddrmgr.BlockStamp, error) {
var bs waddrmgr.BlockStamp
err := s.dbUpdate(rollBackLastBlock(&bs))
return &bs, err
}
func rollBackLastBlock(bs *waddrmgr.BlockStamp) dbUpdateOption {
return func(bucket walletdb.ReadWriteBucket) error {
headerBucket := bucket.NestedReadWriteBucket(
blockHeaderBucketName)
var sync waddrmgr.BlockStamp
err := syncedTo(&sync)(bucket)
if err != nil {
return err
}
err = headerBucket.Delete(sync.Hash[:])
if err != nil {
return err
}
err = headerBucket.Delete(uint32ToBytes(uint32(sync.Height)))
if err != nil {
return err
}
err = putMaxBlockHeight(uint32(sync.Height - 1))(bucket)
if err != nil {
return err
}
sync = waddrmgr.BlockStamp{}
err = syncedTo(&sync)(bucket)
if sync != (waddrmgr.BlockStamp{}) {
*bs = sync
}
return err
}
}
// GetBlockByHash retrieves the block header, filter, and filter tip, based on
// the provided block hash, from the database.
func (s *ChainService) GetBlockByHash(blockHash chainhash.Hash) (
wire.BlockHeader, uint32, error) {
var header wire.BlockHeader
var height uint32
err := s.dbView(getBlockByHash(blockHash, &header, &height))
return header, height, err
}
func getBlockByHash(blockHash chainhash.Hash, header *wire.BlockHeader,
height *uint32) dbViewOption {
return func(bucket walletdb.ReadBucket) error {
headerBucket := bucket.NestedReadBucket(blockHeaderBucketName)
blockBytes := headerBucket.Get(blockHash[:])
if len(blockBytes) < wire.MaxBlockHeaderPayload+4 {
return fmt.Errorf("failed to retrieve block info for"+
" hash %s: want %d bytes, got %d.", blockHash,
wire.MaxBlockHeaderPayload+4, len(blockBytes))
}
buf := bytes.NewReader(blockBytes[:wire.MaxBlockHeaderPayload])
err := header.Deserialize(buf)
if err != nil {
return fmt.Errorf("failed to deserialize block header "+
"for hash: %s", blockHash)
}
*height = binary.LittleEndian.Uint32(
blockBytes[wire.MaxBlockHeaderPayload : wire.MaxBlockHeaderPayload+4])
return nil
}
}
// GetBlockHashByHeight retrieves the hash of a block by its height.
func (s *ChainService) GetBlockHashByHeight(height uint32) (chainhash.Hash,
error) {
var blockHash chainhash.Hash
err := s.dbView(getBlockHashByHeight(height, &blockHash))
return blockHash, err
}
func getBlockHashByHeight(height uint32,
blockHash *chainhash.Hash) dbViewOption {
return func(bucket walletdb.ReadBucket) error {
headerBucket := bucket.NestedReadBucket(blockHeaderBucketName)
hashBytes := headerBucket.Get(uint32ToBytes(height))
if hashBytes == nil {
return fmt.Errorf("no block hash for height %d", height)
}
blockHash.SetBytes(hashBytes)
return nil
}
}
// GetBlockByHeight retrieves a block's information by its height.
func (s *ChainService) GetBlockByHeight(height uint32) (wire.BlockHeader,
error) {
var header wire.BlockHeader
err := s.dbView(getBlockByHeight(height, &header))
return header, err
}
func getBlockByHeight(height uint32, header *wire.BlockHeader) dbViewOption {
return func(bucket walletdb.ReadBucket) error {
var blockHash chainhash.Hash
err := getBlockHashByHeight(height, &blockHash)(bucket)
if err != nil {
return err
}
var gotHeight uint32
err = getBlockByHash(blockHash, header, &gotHeight)(bucket)
if err != nil {
return err
}
if gotHeight != height {
return fmt.Errorf("Got height %d for block at "+
"requested height %d", gotHeight, height)
}
return nil
}
}
// BestSnapshot is a synonym for SyncedTo
func (s *ChainService) BestSnapshot() (*waddrmgr.BlockStamp, error) {
return s.SyncedTo()
}
// SyncedTo retrieves the most recent block's height and hash.
func (s *ChainService) SyncedTo() (*waddrmgr.BlockStamp, error) {
var bs waddrmgr.BlockStamp
err := s.dbView(syncedTo(&bs))
return &bs, err
}
func syncedTo(bs *waddrmgr.BlockStamp) dbViewOption {
return func(bucket walletdb.ReadBucket) error {
var header wire.BlockHeader
var height uint32
err := latestBlock(&header, &height)(bucket)
if err != nil {
return err
}
bs.Hash = header.BlockHash()
bs.Height = int32(height)
return nil
}
}
// LatestBlock retrieves latest stored block's header and height.
func (s *ChainService) LatestBlock() (wire.BlockHeader, uint32, error) {
var bh wire.BlockHeader
var h uint32
err := s.dbView(latestBlock(&bh, &h))
return bh, h, err
}
func latestBlock(header *wire.BlockHeader, height *uint32) dbViewOption {
return func(bucket walletdb.ReadBucket) error {
maxBlockHeightBytes := bucket.Get(maxBlockHeightName)
if maxBlockHeightBytes == nil {
return fmt.Errorf("no max block height stored")
}
*height = binary.LittleEndian.Uint32(maxBlockHeightBytes)
return getBlockByHeight(*height, header)(bucket)
}
}
// BlockLocatorFromHash returns a block locator based on the provided hash.
func (s *ChainService) BlockLocatorFromHash(hash chainhash.Hash) (
blockchain.BlockLocator, error) {
var locator blockchain.BlockLocator
err := s.dbView(blockLocatorFromHash(hash, &locator))
return locator, err
}
func blockLocatorFromHash(hash chainhash.Hash,
locator *blockchain.BlockLocator) dbViewOption {
return func(bucket walletdb.ReadBucket) error {
// Append the initial hash
*locator = append(*locator, &hash)
// If hash isn't found in DB or this is the genesis block, return
// the locator as is
var header wire.BlockHeader
var height uint32
err := getBlockByHash(hash, &header, &height)(bucket)
if (err != nil) || (height == 0) {
return nil
}
decrement := uint32(1)
for (height > 0) && (len(*locator) < wire.MaxBlockLocatorsPerMsg) {
// Decrement by 1 for the first 10 blocks, then double the
// jump until we get to the genesis hash
if len(*locator) > 10 {
decrement *= 2
}
if decrement > height {
height = 0
} else {
height -= decrement
}
var blockHash chainhash.Hash
err := getBlockHashByHeight(height, &blockHash)(bucket)
if err != nil {
return nil
}
*locator = append(*locator, &blockHash)
}
return nil
}
}
// LatestBlockLocator returns the block locator for the latest known block
// stored in the database.
func (s *ChainService) LatestBlockLocator() (blockchain.BlockLocator, error) {
var locator blockchain.BlockLocator
err := s.dbView(latestBlockLocator(&locator))
return locator, err
}
func latestBlockLocator(locator *blockchain.BlockLocator) dbViewOption {
return func(bucket walletdb.ReadBucket) error {
var best waddrmgr.BlockStamp
err := syncedTo(&best)(bucket)
if err != nil {
return err
}
return blockLocatorFromHash(best.Hash, locator)(bucket)
}
}
// CheckConnectivity cycles through all of the block headers, from last to
// first, and makes sure they all connect to each other.
func (s *ChainService) CheckConnectivity() error {
return s.dbView(checkConnectivity())
}
func checkConnectivity() dbViewOption {
return func(bucket walletdb.ReadBucket) error {
var header wire.BlockHeader
var height uint32
err := latestBlock(&header, &height)(bucket)
if err != nil {
return fmt.Errorf("Couldn't retrieve latest block: %s",
err)
}
for height > 0 {
var newHeader wire.BlockHeader
var newHeight uint32
err := getBlockByHash(header.PrevBlock, &newHeader,
&newHeight)(bucket)
if err != nil {
return fmt.Errorf("Couldn't retrieve block %s:"+
" %s", header.PrevBlock, err)
}
if newHeader.BlockHash() != header.PrevBlock {
return fmt.Errorf("Block %s doesn't match "+
"block %s's PrevBlock (%s)",
newHeader.BlockHash(),
header.BlockHash(), header.PrevBlock)
}
if newHeight != height-1 {
return fmt.Errorf("Block %s doesn't have "+
"correct height: want %d, got %d",
newHeader.BlockHash(), height-1,
newHeight)
}
header = newHeader
height = newHeight
}
return nil
}
}
// createSPVNS creates the initial namespace structure needed for all of the
// SPV-related data. This includes things such as all of the buckets as well as
// the version and creation date.
func (s *ChainService) createSPVNS() error {
tx, err := s.db.BeginReadWriteTx()
if err != nil {
return err
}
spvBucket, err := tx.CreateTopLevelBucket(spvBucketName)
if err != nil {
return fmt.Errorf("failed to create main bucket: %s", err)
}
_, err = spvBucket.CreateBucketIfNotExists(blockHeaderBucketName)
if err != nil {
return fmt.Errorf("failed to create block header bucket: %s",
err)
}
_, err = spvBucket.CreateBucketIfNotExists(basicFilterBucketName)
if err != nil {
return fmt.Errorf("failed to create basic filter "+
"bucket: %s", err)
}
_, err = spvBucket.CreateBucketIfNotExists(basicHeaderBucketName)
if err != nil {
return fmt.Errorf("failed to create basic header "+
"bucket: %s", err)
}
_, err = spvBucket.CreateBucketIfNotExists(extFilterBucketName)
if err != nil {
return fmt.Errorf("failed to create extended filter "+
"bucket: %s", err)
}
_, err = spvBucket.CreateBucketIfNotExists(extHeaderBucketName)
if err != nil {
return fmt.Errorf("failed to create extended header "+
"bucket: %s", err)
}
createDate := spvBucket.Get(dbCreateDateName)
if createDate != nil {
log.Info("Wallet SPV namespace already created.")
return nil
}
log.Info("Creating wallet SPV namespace.")
basicFilter, err := builder.BuildBasicFilter(
s.chainParams.GenesisBlock)
if err != nil {
return err
}
basicFilterTip := builder.MakeHeaderForFilter(basicFilter,
s.chainParams.GenesisBlock.Header.PrevBlock)
extFilter, err := builder.BuildExtFilter(
s.chainParams.GenesisBlock)
if err != nil {
return err
}
extFilterTip := builder.MakeHeaderForFilter(extFilter,
s.chainParams.GenesisBlock.Header.PrevBlock)
err = putBlock(s.chainParams.GenesisBlock.Header, 0)(spvBucket)
if err != nil {
return err
}
err = putBasicFilter(*s.chainParams.GenesisHash, basicFilter)(spvBucket)
if err != nil {
return err
}
err = putBasicHeader(*s.chainParams.GenesisHash, basicFilterTip)(
spvBucket)
if err != nil {
return err
}
err = putExtFilter(*s.chainParams.GenesisHash, extFilter)(spvBucket)
if err != nil {
return err
}
err = putExtHeader(*s.chainParams.GenesisHash, extFilterTip)(spvBucket)
if err != nil {
return err
}
err = putDBVersion(latestDBVersion)(spvBucket)
if err != nil {
return err
}
err = putMaxBlockHeight(0)(spvBucket)
if err != nil {
return err
}
err = spvBucket.Put(dbCreateDateName,
uint64ToBytes(uint64(time.Now().Unix())))
if err != nil {
return fmt.Errorf("failed to store database creation "+
"time: %s", err)
}
return tx.Commit()
}
// dbUpdate allows the passed function to update the ChainService DB bucket.
func (s *ChainService) dbUpdate(updateFunc dbUpdateOption) error {
tx, err := s.db.BeginReadWriteTx()
if err != nil {
tx.Rollback()
return err
}
bucket := tx.ReadWriteBucket(spvBucketName)
err = updateFunc(bucket)
if err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}
// dbView allows the passed function to read the ChainService DB bucket.
func (s *ChainService) dbView(viewFunc dbViewOption) error {
tx, err := s.db.BeginReadTx()
defer tx.Rollback()
if err != nil {
return err
}
bucket := tx.ReadBucket(spvBucketName)
return viewFunc(bucket)
}

View file

@ -1,70 +0,0 @@
package spvchain
import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/waddrmgr"
)
// SPVChain is an implementation of the btcwalet chain.Interface interface.
type SPVChain struct {
cs *ChainService
}
// NewSPVChain creates a new SPVChain struct with a backing ChainService
func NewSPVChain(chainService *ChainService) *SPVChain {
return &SPVChain{
cs: chainService,
}
}
// Start replicates the RPC client's Start method.
func (s *SPVChain) Start() error {
s.cs.Start()
return nil
}
// Stop replicates the RPC client's Stop method.
func (s *SPVChain) Stop() {
s.cs.Stop()
}
// WaitForShutdown replicates the RPC client's WaitForShutdown method.
func (s *SPVChain) WaitForShutdown() {
s.cs.Stop()
}
// SendRawTransaction replicates the RPC client's SendRawTransaction command.
func (s *SPVChain) SendRawTransaction(tx *wire.MsgTx, allowHighFees bool) (
*chainhash.Hash, error) {
err := s.cs.SendTransaction(tx)
if err != nil {
return nil, err
}
hash := tx.TxHash()
return &hash, nil
}
// GetBlock replicates the RPC client's GetBlock command.
func (s *SPVChain) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) {
block, err := s.cs.GetBlockFromNetwork(*hash)
if err != nil {
return nil, err
}
return block.MsgBlock(), nil
}
// GetBestBlock replicates the RPC client's GetBestBlock command.
func (s *SPVChain) GetBestBlock() (*chainhash.Hash, int32, error) {
header, height, err := s.cs.LatestBlock()
if err != nil {
return nil, 0, err
}
hash := header.BlockHash()
return &hash, int32(height), nil
}
// BlockStamp replicates the RPC client's BlockStamp command.
func (s *SPVChain) BlockStamp() (*waddrmgr.BlockStamp, error) {
return s.cs.SyncedTo()
}

View file

@ -1,26 +0,0 @@
package spvchain
import "github.com/btcsuite/btclog"
// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log btclog.Logger
// The default amount of logging is none.
func init() {
DisableLog()
}
// DisableLog disables all library log output. Logging output is disabled
// by default until either UseLogger or SetLogWriter are called.
func DisableLog() {
log = btclog.Disabled
}
// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}

View file

@ -1,275 +0,0 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
// NOTE: THIS API IS UNSTABLE RIGHT NOW.
package spvchain
import (
"errors"
"github.com/btcsuite/btcd/addrmgr"
"github.com/btcsuite/btcd/connmgr"
)
type getConnCountMsg struct {
reply chan int32
}
type getPeersMsg struct {
reply chan []*serverPeer
}
type getOutboundGroup struct {
key string
reply chan int
}
type getAddedNodesMsg struct {
reply chan []*serverPeer
}
type disconnectNodeMsg struct {
cmp func(*serverPeer) bool
reply chan error
}
type connectNodeMsg struct {
addr string
permanent bool
reply chan error
}
type removeNodeMsg struct {
cmp func(*serverPeer) bool
reply chan error
}
type forAllPeersMsg struct {
closure func(*serverPeer)
}
// TODO: General - abstract out more of blockmanager into queries. It'll make
// this way more maintainable and usable.
// handleQuery is the central handler for all queries and commands from other
// goroutines related to peer state.
func (s *ChainService) handleQuery(state *peerState, querymsg interface{}) {
switch msg := querymsg.(type) {
case getConnCountMsg:
nconnected := int32(0)
state.forAllPeers(func(sp *serverPeer) {
if sp.Connected() {
nconnected++
}
})
msg.reply <- nconnected
case getPeersMsg:
peers := make([]*serverPeer, 0, state.Count())
state.forAllPeers(func(sp *serverPeer) {
if !sp.Connected() {
return
}
peers = append(peers, sp)
})
msg.reply <- peers
case connectNodeMsg:
// TODO: duplicate oneshots?
// Limit max number of total peers.
if state.Count() >= MaxPeers {
msg.reply <- errors.New("max peers reached")
return
}
for _, peer := range state.persistentPeers {
if peer.Addr() == msg.addr {
if msg.permanent {
msg.reply <- errors.New("peer already connected")
} else {
msg.reply <- errors.New("peer exists as a permanent peer")
}
return
}
}
netAddr, err := addrStringToNetAddr(msg.addr)
if err != nil {
msg.reply <- err
return
}
// TODO: if too many, nuke a non-perm peer.
go s.connManager.Connect(&connmgr.ConnReq{
Addr: netAddr,
Permanent: msg.permanent,
})
msg.reply <- nil
case removeNodeMsg:
found := disconnectPeer(state.persistentPeers, msg.cmp, func(sp *serverPeer) {
// Keep group counts ok since we remove from
// the list now.
state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
})
if found {
msg.reply <- nil
} else {
msg.reply <- errors.New("peer not found")
}
case getOutboundGroup:
count, ok := state.outboundGroups[msg.key]
if ok {
msg.reply <- count
} else {
msg.reply <- 0
}
// Request a list of the persistent (added) peers.
case getAddedNodesMsg:
// Respond with a slice of the relavent peers.
peers := make([]*serverPeer, 0, len(state.persistentPeers))
for _, sp := range state.persistentPeers {
peers = append(peers, sp)
}
msg.reply <- peers
case disconnectNodeMsg:
// Check outbound peers.
found := disconnectPeer(state.outboundPeers, msg.cmp, func(sp *serverPeer) {
// Keep group counts ok since we remove from
// the list now.
state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
})
if found {
// If there are multiple outbound connections to the same
// ip:port, continue disconnecting them all until no such
// peers are found.
for found {
found = disconnectPeer(state.outboundPeers, msg.cmp, func(sp *serverPeer) {
state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
})
}
msg.reply <- nil
return
}
msg.reply <- errors.New("peer not found")
case forAllPeersMsg:
// TODO: Remove this when it's unnecessary due to wider use of
// queryPeers.
// Run the closure on all peers in the passed state.
state.forAllPeers(msg.closure)
// Even though this is a query, there's no reply channel as the
// forAllPeers method doesn't return anything. An error might be
// useful in the future.
}
}
// ConnectedCount returns the number of currently connected peers.
func (s *ChainService) ConnectedCount() int32 {
replyChan := make(chan int32)
s.query <- getConnCountMsg{reply: replyChan}
return <-replyChan
}
// OutboundGroupCount returns the number of peers connected to the given
// outbound group key.
func (s *ChainService) OutboundGroupCount(key string) int {
replyChan := make(chan int)
s.query <- getOutboundGroup{key: key, reply: replyChan}
return <-replyChan
}
// AddedNodeInfo returns an array of btcjson.GetAddedNodeInfoResult structures
// describing the persistent (added) nodes.
func (s *ChainService) AddedNodeInfo() []*serverPeer {
replyChan := make(chan []*serverPeer)
s.query <- getAddedNodesMsg{reply: replyChan}
return <-replyChan
}
// Peers returns an array of all connected peers.
func (s *ChainService) Peers() []*serverPeer {
replyChan := make(chan []*serverPeer)
s.query <- getPeersMsg{reply: replyChan}
return <-replyChan
}
// DisconnectNodeByAddr disconnects a peer by target address. Both outbound and
// inbound nodes will be searched for the target node. An error message will
// be returned if the peer was not found.
func (s *ChainService) DisconnectNodeByAddr(addr string) error {
replyChan := make(chan error)
s.query <- disconnectNodeMsg{
cmp: func(sp *serverPeer) bool { return sp.Addr() == addr },
reply: replyChan,
}
return <-replyChan
}
// DisconnectNodeByID disconnects a peer by target node id. Both outbound and
// inbound nodes will be searched for the target node. An error message will be
// returned if the peer was not found.
func (s *ChainService) DisconnectNodeByID(id int32) error {
replyChan := make(chan error)
s.query <- disconnectNodeMsg{
cmp: func(sp *serverPeer) bool { return sp.ID() == id },
reply: replyChan,
}
return <-replyChan
}
// RemoveNodeByAddr removes a peer from the list of persistent peers if
// present. An error will be returned if the peer was not found.
func (s *ChainService) RemoveNodeByAddr(addr string) error {
replyChan := make(chan error)
s.query <- removeNodeMsg{
cmp: func(sp *serverPeer) bool { return sp.Addr() == addr },
reply: replyChan,
}
return <-replyChan
}
// RemoveNodeByID removes a peer by node ID from the list of persistent peers
// if present. An error will be returned if the peer was not found.
func (s *ChainService) RemoveNodeByID(id int32) error {
replyChan := make(chan error)
s.query <- removeNodeMsg{
cmp: func(sp *serverPeer) bool { return sp.ID() == id },
reply: replyChan,
}
return <-replyChan
}
// ConnectNode adds `addr' as a new outbound peer. If permanent is true then the
// peer will be persistent and reconnect if the connection is lost.
// It is an error to call this with an already existing peer.
func (s *ChainService) ConnectNode(addr string, permanent bool) error {
replyChan := make(chan error)
s.query <- connectNodeMsg{addr: addr, permanent: permanent, reply: replyChan}
return <-replyChan
}
// ForAllPeers runs a closure over all peers (outbound and persistent) to which
// the ChainService is connected. Nothing is returned because the peerState's
// ForAllPeers method doesn't return anything as the closure passed to it
// doesn't return anything.
func (s *ChainService) ForAllPeers(closure func(sp *serverPeer)) {
s.query <- forAllPeersMsg{
closure: closure,
}
}

View file

@ -1,494 +0,0 @@
// NOTE: THIS API IS UNSTABLE RIGHT NOW.
package spvchain
import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcutil/gcs"
"github.com/btcsuite/btcutil/gcs/builder"
)
var (
// QueryTimeout specifies how long to wait for a peer to answer a query.
QueryTimeout = time.Second * 3
// QueryNumRetries specifies how many times to retry sending a query to
// each peer before we've concluded we aren't going to get a valid
// response. This allows to make up for missed messages in some
// instances.
QueryNumRetries = 2
)
// Query options can be modified per-query, unlike global options.
// TODO: Make more query options that override global options.
type queryOptions struct {
// timeout lets the query know how long to wait for a peer to
// answer the query before moving onto the next peer.
timeout time.Duration
// numRetries tells the query how many times to retry asking each peer
// the query.
numRetries uint8
// doneChan lets the query signal the caller when it's done, in case
// it's run in a goroutine.
doneChan chan<- struct{}
}
// QueryOption is a functional option argument to any of the network query
// methods, such as GetBlockFromNetwork and GetCFilter (when that resorts to a
// network query). These are always processed in order, with later options
// overriding earlier ones.
type QueryOption func(*queryOptions)
// defaultQueryOptions returns a queryOptions set to package-level defaults.
func defaultQueryOptions() *queryOptions {
return &queryOptions{
timeout: QueryTimeout,
numRetries: uint8(QueryNumRetries),
}
}
// Timeout is a query option that lets the query know how long to wait for
// each peer we ask the query to answer it before moving on.
func Timeout(timeout time.Duration) QueryOption {
return func(qo *queryOptions) {
qo.timeout = timeout
}
}
// NumRetries is a query option that lets the query know the maximum number of
// times each peer should be queried. The default is one.
func NumRetries(numRetries uint8) QueryOption {
return func(qo *queryOptions) {
qo.numRetries = numRetries
}
}
// DoneChan allows the caller to pass a channel that will get closed when the
// query is finished.
func DoneChan(doneChan chan<- struct{}) QueryOption {
return func(qo *queryOptions) {
qo.doneChan = doneChan
}
}
type spMsg struct {
sp *serverPeer
msg wire.Message
}
type spMsgSubscription struct {
msgChan chan<- spMsg
quitChan <-chan struct{}
wg *sync.WaitGroup
}
// queryPeers is a helper function that sends a query to one or more peers and
// waits for an answer. The timeout for queries is set by the QueryTimeout
// package-level variable.
func (s *ChainService) queryPeers(
// queryMsg is the message to send to each peer selected by selectPeer.
queryMsg wire.Message,
// checkResponse is caled for every message within the timeout period.
// The quit channel lets the query know to terminate because the
// required response has been found. This is done by closing the
// channel.
checkResponse func(sp *serverPeer, resp wire.Message,
quit chan<- struct{}),
// options takes functional options for executing the query.
options ...QueryOption,
) {
qo := defaultQueryOptions()
for _, option := range options {
option(qo)
}
// This is done in a single-threaded query because the peerState is held
// in a single thread. This is the only part of the query framework that
// requires access to peerState, so it's done once per query.
peers := s.Peers()
syncPeer := s.blockManager.SyncPeer()
// This will be shared state between the per-peer goroutines.
quit := make(chan struct{})
allQuit := make(chan struct{})
startQuery := make(chan struct{})
var wg sync.WaitGroup
var syncPeerTries uint32
// Increase this number to be able to handle more queries at once as
// each channel gets results for all queries, otherwise messages can
// get mixed and there's a vicious cycle of retries causing a bigger
// message flood, more of which get missed.
msgChan := make(chan spMsg)
var subwg sync.WaitGroup
subscription := spMsgSubscription{
msgChan: msgChan,
quitChan: allQuit,
wg: &subwg,
}
// Start a goroutine for each peer that potentially queries that peer.
for _, sp := range peers {
wg.Add(1)
go func(sp *serverPeer) {
numRetries := qo.numRetries
defer wg.Done()
defer sp.unsubscribeRecvMsgs(subscription)
// Should we do this when the goroutine gets a message
// via startQuery rather than at the launch of the
// goroutine?
if !sp.Connected() {
return
}
timeout := make(<-chan time.Time)
queryLoop:
for {
select {
case <-timeout:
// After timeout, we try to notify
// another of our peer goroutines to
// do a query until we get a signal to
// quit.
select {
case startQuery <- struct{}{}:
case <-quit:
return
case <-allQuit:
return
}
// At this point, we've sent startQuery.
// We return if we've run through this
// section of code numRetries times.
if numRetries--; numRetries == 0 {
return
}
case <-quit:
// After we're told to quit, we return.
return
case <-allQuit:
// After we're told to quit, we return.
return
case <-startQuery:
// We're the lucky peer whose turn it is
// to try to answer the current query.
// TODO: Add support for querying *all*
// peers simultaneously to avoid timeout
// delays.
// If the sync peer hasn't tried yet and
// we aren't the sync peer, don't do
// anything but forward the message down
// the startQuery channel until the
// sync peer gets a shot.
if sp == syncPeer {
atomic.StoreUint32(
&syncPeerTries, 1)
}
if atomic.LoadUint32(&syncPeerTries) ==
0 {
select {
case startQuery <- struct{}{}:
case <-quit:
return
case <-allQuit:
return
}
continue queryLoop
}
sp.subscribeRecvMsg(subscription)
// Don't want the peer hanging on send
// to the channel if we quit before
// reading the channel.
sentChan := make(chan struct{}, 1)
sp.QueueMessageWithEncoding(queryMsg,
sentChan, wire.WitnessEncoding)
select {
case <-sentChan:
case <-quit:
return
case <-allQuit:
return
}
timeout = time.After(qo.timeout)
default:
}
}
}(sp)
}
startQuery <- struct{}{}
// This goroutine will wait until all of the peer-query goroutines have
// terminated, and then initiate a query shutdown.
go func() {
wg.Wait()
// If we timed out on each goroutine and didn't quit or time out
// on the main goroutine, make sure our main goroutine knows to
// quit.
select {
case <-allQuit:
default:
close(allQuit)
}
// Close the done channel, if any
if qo.doneChan != nil {
close(qo.doneChan)
}
// Wait until all goroutines started by subscriptions have
// exited after we closed allQuit before letting the message
// channel get garbage collected.
subwg.Wait()
}()
// Loop for any messages sent to us via our subscription channel and
// check them for whether they satisfy the query. Break the loop if it's
// time to quit.
timeout := time.After(time.Duration(len(peers)+1) *
qo.timeout * time.Duration(qo.numRetries))
checkResponses:
for {
select {
case <-timeout:
// When we time out, close the allQuit channel
// if it hasn't already been closed.
select {
case <-allQuit:
default:
close(allQuit)
}
break checkResponses
case <-quit:
break checkResponses
case <-allQuit:
break checkResponses
case sm := <-msgChan:
// TODO: This will get stuck if checkResponse
// gets stuck. This is a caveat for callers that
// should be fixed before exposing this function
// for public use.
checkResponse(sm.sp, sm.msg, quit)
}
}
}
// GetCFilter gets a cfilter from the database. Failing that, it requests the
// cfilter from the network and writes it to the database.
func (s *ChainService) GetCFilter(blockHash chainhash.Hash,
extended bool, options ...QueryOption) (*gcs.Filter, error) {
getFilter := s.GetBasicFilter
getHeader := s.GetBasicHeader
putFilter := s.putBasicFilter
if extended {
getFilter = s.GetExtFilter
getHeader = s.GetExtHeader
putFilter = s.putExtFilter
}
filter, err := getFilter(blockHash)
if err == nil && filter != nil {
return filter, nil
}
// We didn't get the filter from the DB, so we'll set it to nil and try
// to get it from the network.
filter = nil
block, _, err := s.GetBlockByHash(blockHash)
if err != nil {
return nil, err
}
if block.BlockHash() != blockHash {
return nil, fmt.Errorf("Couldn't get header for block %s "+
"from database", blockHash)
}
curHeader, err := getHeader(blockHash)
if err != nil {
return nil, fmt.Errorf("Couldn't get cfheader for block %s "+
"from database", blockHash)
}
prevHeader, err := getHeader(block.PrevBlock)
if err != nil {
return nil, fmt.Errorf("Couldn't get cfheader for block %s "+
"from database", blockHash)
}
// If we're expecting a zero filter, just return a nil filter and don't
// bother trying to get it from the network. The caller will know
// there's no error because we're also returning a nil error.
if builder.MakeHeaderForFilter(nil, *prevHeader) == *curHeader {
return nil, nil
}
s.queryPeers(
// Send a wire.GetCFilterMsg
wire.NewMsgGetCFilter(&blockHash, extended),
// Check responses and if we get one that matches,
// end the query early.
func(sp *serverPeer, resp wire.Message,
quit chan<- struct{}) {
switch response := resp.(type) {
// We're only interested in "cfilter" messages.
case *wire.MsgCFilter:
// Only keep this going if we haven't already
// found a filter, or we risk closing an already
// closed channel.
if filter != nil {
return
}
if len(response.Data) < 4 {
// Filter data is too short.
// Ignore this message.
return
}
if blockHash != response.BlockHash {
// The response doesn't match our
// request. Ignore this message.
return
}
gotFilter, err := gcs.FromNBytes(
builder.DefaultP, response.Data)
if err != nil {
// Malformed filter data. We
// can ignore this message.
return
}
if builder.MakeHeaderForFilter(gotFilter,
*prevHeader) != *curHeader {
// Filter data doesn't match
// the headers we know about.
// Ignore this response.
return
}
// At this point, the filter matches
// what we know about it and we declare
// it sane. We can kill the query and
// pass the response back to the caller.
close(quit)
filter = gotFilter
default:
}
},
options...,
)
// If we've found a filter, write it to the database for next time.
if filter != nil {
putFilter(blockHash, filter)
log.Tracef("Wrote filter for block %s, extended: %t",
blockHash, extended)
}
return filter, nil
}
// GetBlockFromNetwork gets a block by requesting it from the network, one peer
// at a time, until one answers.
func (s *ChainService) GetBlockFromNetwork(
blockHash chainhash.Hash, options ...QueryOption) (*btcutil.Block,
error) {
blockHeader, height, err := s.GetBlockByHash(blockHash)
if err != nil || blockHeader.BlockHash() != blockHash {
return nil, fmt.Errorf("Couldn't get header for block %s "+
"from database", blockHash)
}
getData := wire.NewMsgGetData()
getData.AddInvVect(wire.NewInvVect(wire.InvTypeWitnessBlock,
&blockHash))
// The block is only updated from the checkResponse function argument,
// which is always called single-threadedly. We don't check the block
// until after the query is finished, so we can just write to it
// naively.
var foundBlock *btcutil.Block
s.queryPeers(
// Send a wire.GetCFilterMsg
getData,
// Check responses and if we get one that matches,
// end the query early.
func(sp *serverPeer, resp wire.Message,
quit chan<- struct{}) {
switch response := resp.(type) {
// We're only interested in "block" messages.
case *wire.MsgBlock:
// Only keep this going if we haven't already
// found a block, or we risk closing an already
// closed channel.
if foundBlock != nil {
return
}
// If this isn't our block, ignore it.
if response.BlockHash() != blockHash {
return
}
block := btcutil.NewBlock(response)
// Only set height if btcutil hasn't
// automagically put one in.
if block.Height() ==
btcutil.BlockHeightUnknown {
block.SetHeight(int32(height))
}
// If this claims our block but doesn't
// pass the sanity check, the peer is
// trying to bamboozle us. Disconnect
// it.
if err := blockchain.CheckBlockSanity(
block,
// We don't need to check PoW
// because by the time we get
// here, it's been checked
// during header synchronization
s.chainParams.PowLimit,
s.timeSource,
); err != nil {
log.Warnf("Invalid block for %s "+
"received from %s -- "+
"disconnecting peer", blockHash,
sp.Addr())
sp.Disconnect()
return
}
// At this point, the block matches what we know
// about it and we declare it sane. We can kill
// the query and pass the response back to the
// caller.
close(quit)
foundBlock = block
default:
}
},
options...,
)
if foundBlock == nil {
return nil, fmt.Errorf("Couldn't retrieve block %s from "+
"network", blockHash)
}
return foundBlock, nil
}
// SendTransaction sends a transaction to each peer. It returns an error if any
// peer rejects the transaction for any reason than that it's already known.
// TODO: Better privacy by sending to only one random peer and watching
// propagation, requires better peer selection support in query API.
func (s *ChainService) SendTransaction(tx *wire.MsgTx,
options ...QueryOption) error {
var err error
s.queryPeers(
tx,
func(sp *serverPeer, resp wire.Message, quit chan<- struct{}) {
switch response := resp.(type) {
case *wire.MsgReject:
if response.Hash == tx.TxHash() &&
!strings.Contains(response.Reason,
"already have transaction") {
err = log.Errorf("Transaction %s "+
"rejected by %s: %s",
tx.TxHash(), sp.Addr(),
response.Reason)
close(quit)
}
}
},
options...,
)
return err
}

View file

@ -1,777 +0,0 @@
// NOTE: THIS API IS UNSTABLE RIGHT NOW.
package spvchain
import (
"bytes"
"fmt"
"sync/atomic"
"github.com/btcsuite/btcd/btcjson"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcrpcclient"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcutil/gcs"
"github.com/btcsuite/btcutil/gcs/builder"
"github.com/btcsuite/btcwallet/waddrmgr"
)
// Relevant package-level variables live here
var ()
// Functional parameters for Rescan
type rescanOptions struct {
chain *ChainService
queryOptions []QueryOption
ntfn btcrpcclient.NotificationHandlers
startBlock *waddrmgr.BlockStamp
endBlock *waddrmgr.BlockStamp
watchAddrs []btcutil.Address
watchOutPoints []wire.OutPoint
watchTxIDs []chainhash.Hash
watchList [][]byte
txIdx uint32
update <-chan *updateOptions
quit <-chan struct{}
}
// RescanOption is a functional option argument to any of the rescan and
// notification subscription methods. These are always processed in order, with
// later options overriding earlier ones.
type RescanOption func(ro *rescanOptions)
func defaultRescanOptions() *rescanOptions {
return &rescanOptions{}
}
// QueryOptions pass onto the underlying queries.
func QueryOptions(options ...QueryOption) RescanOption {
return func(ro *rescanOptions) {
ro.queryOptions = options
}
}
// NotificationHandlers specifies notification handlers for the rescan. These
// will always run in the same goroutine as the caller.
func NotificationHandlers(ntfn btcrpcclient.NotificationHandlers) RescanOption {
return func(ro *rescanOptions) {
ro.ntfn = ntfn
}
}
// StartBlock specifies the start block. The hash is checked first; if there's
// no such hash (zero hash avoids lookup), the height is checked next. If
// the height is 0 or the start block isn't specified, starts from the genesis
// block. This block is assumed to already be known, and no notifications will
// be sent for this block.
func StartBlock(startBlock *waddrmgr.BlockStamp) RescanOption {
return func(ro *rescanOptions) {
ro.startBlock = startBlock
}
}
// EndBlock specifies the end block. The hash is checked first; if there's no
// such hash (zero hash avoids lookup), the height is checked next. If the
// height is 0 or in the future or the end block isn't specified, the quit
// channel MUST be specified as Rescan will sync to the tip of the blockchain
// and continue to stay in sync and pass notifications. This is enforced at
// runtime.
func EndBlock(endBlock *waddrmgr.BlockStamp) RescanOption {
return func(ro *rescanOptions) {
ro.endBlock = endBlock
}
}
// WatchAddrs specifies the addresses to watch/filter for. Each call to this
// function adds to the list of addresses being watched rather than replacing
// the list. Each time a transaction spends to the specified address, the
// outpoint is added to the WatchOutPoints list.
func WatchAddrs(watchAddrs ...btcutil.Address) RescanOption {
return func(ro *rescanOptions) {
ro.watchAddrs = append(ro.watchAddrs, watchAddrs...)
}
}
// WatchOutPoints specifies the outpoints to watch for on-chain spends. Each
// call to this function adds to the list of outpoints being watched rather
// than replacing the list.
func WatchOutPoints(watchOutPoints ...wire.OutPoint) RescanOption {
return func(ro *rescanOptions) {
ro.watchOutPoints = append(ro.watchOutPoints, watchOutPoints...)
}
}
// WatchTxIDs specifies the outpoints to watch for on-chain spends. Each
// call to this function adds to the list of outpoints being watched rather
// than replacing the list.
func WatchTxIDs(watchTxIDs ...chainhash.Hash) RescanOption {
return func(ro *rescanOptions) {
ro.watchTxIDs = append(ro.watchTxIDs, watchTxIDs...)
}
}
// TxIdx specifies a hint transaction index into the block in which the UTXO
// is created (eg, coinbase is 0, next transaction is 1, etc.)
func TxIdx(txIdx uint32) RescanOption {
return func(ro *rescanOptions) {
ro.txIdx = txIdx
}
}
// QuitChan specifies the quit channel. This can be used by the caller to let
// an indefinite rescan (one with no EndBlock set) know it should gracefully
// shut down. If this isn't specified, an end block MUST be specified as Rescan
// must know when to stop. This is enforced at runtime.
func QuitChan(quit <-chan struct{}) RescanOption {
return func(ro *rescanOptions) {
ro.quit = quit
}
}
// updateChan specifies an update channel. This is for internal use by the
// Rescan.Update functionality.
func updateChan(update <-chan *updateOptions) RescanOption {
return func(ro *rescanOptions) {
ro.update = update
}
}
// Rescan is a single-threaded function that uses headers from the database and
// functional options as arguments.
func (s *ChainService) Rescan(options ...RescanOption) error {
ro := defaultRescanOptions()
ro.endBlock = &waddrmgr.BlockStamp{
Hash: *s.chainParams.GenesisHash,
Height: 0,
}
for _, option := range options {
option(ro)
}
ro.chain = s
// If we have something to watch, create a watch list.
for _, addr := range ro.watchAddrs {
ro.watchList = append(ro.watchList, addr.ScriptAddress())
}
for _, op := range ro.watchOutPoints {
ro.watchList = append(ro.watchList,
builder.OutPointToFilterEntry(op))
}
for _, txid := range ro.watchTxIDs {
ro.watchList = append(ro.watchList, txid[:])
}
if len(ro.watchList) == 0 {
return fmt.Errorf("Rescan must specify addresses and/or " +
"outpoints and/or TXIDs to watch")
}
// Check that we have either an end block or a quit channel.
if ro.endBlock != nil {
if (ro.endBlock.Hash != chainhash.Hash{}) {
_, height, err := s.GetBlockByHash(ro.endBlock.Hash)
if err != nil {
ro.endBlock.Hash = chainhash.Hash{}
} else {
ro.endBlock.Height = int32(height)
}
}
if (ro.endBlock.Hash == chainhash.Hash{}) {
if ro.endBlock.Height != 0 {
header, err := s.GetBlockByHeight(
uint32(ro.endBlock.Height))
if err == nil {
ro.endBlock.Hash = header.BlockHash()
} else {
ro.endBlock = &waddrmgr.BlockStamp{}
}
}
}
} else {
ro.endBlock = &waddrmgr.BlockStamp{}
}
if ro.quit == nil && ro.endBlock.Height == 0 {
return fmt.Errorf("Rescan request must specify a quit channel" +
" or valid end block")
}
// Track our position in the chain.
var curHeader wire.BlockHeader
curStamp := *ro.startBlock
// Find our starting block.
if (curStamp.Hash != chainhash.Hash{}) {
header, height, err := s.GetBlockByHash(curStamp.Hash)
if err == nil {
curHeader = header
curStamp.Height = int32(height)
} else {
curStamp.Hash = chainhash.Hash{}
}
}
if (curStamp.Hash == chainhash.Hash{}) {
if curStamp.Height == 0 {
curStamp.Hash = *s.chainParams.GenesisHash
} else {
header, err := s.GetBlockByHeight(
uint32(curStamp.Height))
if err == nil {
curHeader = header
curStamp.Hash = curHeader.BlockHash()
} else {
curHeader =
s.chainParams.GenesisBlock.Header
curStamp.Hash =
*s.chainParams.GenesisHash
curStamp.Height = 0
}
}
}
log.Tracef("Starting rescan from known block %d (%s)", curStamp.Height,
curStamp.Hash)
// Listen for notifications.
blockConnected := make(chan wire.BlockHeader)
blockDisconnected := make(chan wire.BlockHeader)
subscription := blockSubscription{
onConnectBasic: blockConnected,
onDisconnect: blockDisconnected,
quit: ro.quit,
}
// Loop through blocks, one at a time. This relies on the underlying
// ChainService API to send blockConnected and blockDisconnected
// notifications in the correct order.
current := false
rescanLoop:
for {
// If we're current, we wait for notifications.
if current {
// Wait for a signal that we have a newly connected
// header and cfheader, or a newly disconnected header;
// alternatively, forward ourselves to the next block
// if possible.
select {
case <-ro.quit:
s.unsubscribeBlockMsgs(subscription)
return nil
case update := <-ro.update:
rewound, err := ro.updateFilter(update,
&curStamp, &curHeader)
if err != nil {
return err
}
if rewound {
current = false
}
case header := <-blockConnected:
// Only deal with the next block from what we
// know about. Otherwise, it's in the future.
if header.PrevBlock != curStamp.Hash {
continue rescanLoop
}
curHeader = header
curStamp.Hash = header.BlockHash()
curStamp.Height++
case header := <-blockDisconnected:
// Only deal with it if it's the current block
// we know about. Otherwise, it's in the future.
if header.BlockHash() == curStamp.Hash {
// Run through notifications. This is
// all single-threaded. We include
// deprecated calls as they're still
// used, for now.
if ro.ntfn.
OnFilteredBlockDisconnected !=
nil {
ro.ntfn.OnFilteredBlockDisconnected(
curStamp.Height,
&curHeader)
}
if ro.ntfn.OnBlockDisconnected != nil {
ro.ntfn.OnBlockDisconnected(
&curStamp.Hash,
curStamp.Height,
curHeader.Timestamp)
}
header, _, err := s.GetBlockByHash(
header.PrevBlock)
if err != nil {
return err
}
curHeader = header
curStamp.Hash = header.BlockHash()
curStamp.Height--
}
continue rescanLoop
}
} else {
// Since we're not current, we try to manually advance
// the block. If we fail, we mark outselves as current
// and follow notifications.
header, err := s.GetBlockByHeight(uint32(
curStamp.Height + 1))
if err != nil {
log.Tracef("Rescan became current at %d (%s), "+
"subscribing to block notifications",
curStamp.Height, curStamp.Hash)
current = true
// Subscribe to block notifications.
s.subscribeBlockMsg(subscription)
continue rescanLoop
}
curHeader = header
curStamp.Height++
curStamp.Hash = header.BlockHash()
}
// At this point, we've found the block header that's next in
// our rescan. First, if we're sending out BlockConnected
// notifications, do that.
if ro.ntfn.OnBlockConnected != nil {
ro.ntfn.OnBlockConnected(&curStamp.Hash,
curStamp.Height, curHeader.Timestamp)
}
// Now we need to see if it matches the rescan's filters, so we
// get the basic filter from the DB or network.
var block *btcutil.Block
var relevantTxs []*btcutil.Tx
var bFilter, eFilter *gcs.Filter
var err error
key := builder.DeriveKey(&curStamp.Hash)
matched := false
bFilter, err = s.GetCFilter(curStamp.Hash, false)
if err != nil {
return err
}
if bFilter != nil && bFilter.N() != 0 {
// We see if any relevant transactions match.
matched, err = bFilter.MatchAny(key, ro.watchList)
if err != nil {
return err
}
}
if len(ro.watchTxIDs) > 0 {
eFilter, err = s.GetCFilter(curStamp.Hash, true)
if err != nil {
return err
}
}
if eFilter != nil && eFilter.N() != 0 {
// We see if any relevant transactions match.
matched, err = eFilter.MatchAny(key, ro.watchList)
if err != nil {
return err
}
}
// If we have no transactions, we just send an
// OnFilteredBlockConnected notification with no relevant
// transactions.
if matched {
// We've matched. Now we actually get the block
// and cycle through the transactions to see
// which ones are relevant.
block, err = s.GetBlockFromNetwork(
curStamp.Hash, ro.queryOptions...)
if err != nil {
return err
}
if block == nil {
return fmt.Errorf("Couldn't get block %d "+
"(%s) from network", curStamp.Height,
curStamp.Hash)
}
relevantTxs, err = ro.notifyBlock(block)
if err != nil {
return err
}
}
if ro.ntfn.OnFilteredBlockConnected != nil {
ro.ntfn.OnFilteredBlockConnected(curStamp.Height,
&curHeader, relevantTxs)
}
if curStamp.Hash == ro.endBlock.Hash || curStamp.Height ==
ro.endBlock.Height {
return nil
}
select {
case update := <-ro.update:
rewound, err := ro.updateFilter(update, &curStamp,
&curHeader)
if err != nil {
return err
}
if rewound {
current = false
}
default:
}
}
}
// updateFilter atomically updates the filter and rewinds to the specified
// height if not 0.
func (ro *rescanOptions) updateFilter(update *updateOptions,
curStamp *waddrmgr.BlockStamp, curHeader *wire.BlockHeader) (bool,
error) {
ro.watchAddrs = append(ro.watchAddrs,
update.addrs...)
ro.watchOutPoints = append(ro.watchOutPoints,
update.outPoints...)
ro.watchTxIDs = append(ro.watchTxIDs,
update.txIDs...)
for _, addr := range update.addrs {
ro.watchList = append(ro.watchList, addr.ScriptAddress())
}
for _, op := range update.outPoints {
ro.watchList = append(ro.watchList,
builder.OutPointToFilterEntry(op))
}
for _, txid := range update.txIDs {
ro.watchList = append(ro.watchList, txid[:])
}
// Rewind if requested
if update.rewind == 0 {
return false, nil
}
var header wire.BlockHeader
var height uint32
var rewound bool
var err error
for curStamp.Height > int32(update.rewind) {
if ro.ntfn.OnBlockDisconnected != nil {
ro.ntfn.OnBlockDisconnected(&curStamp.Hash,
curStamp.Height, curHeader.Timestamp)
}
if ro.ntfn.OnFilteredBlockDisconnected != nil {
ro.ntfn.OnFilteredBlockDisconnected(curStamp.Height,
curHeader)
}
// Don't rewind past the last block we need to disconnect,
// because otherwise we connect the last known good block
// without ever disconnecting it.
if curStamp.Height == int32(update.rewind+1) {
break
}
// Rewind and continue.
header, height, err =
ro.chain.GetBlockByHash(curHeader.PrevBlock)
if err != nil {
return rewound, err
}
*curHeader = header
curStamp.Height = int32(height)
curStamp.Hash = curHeader.BlockHash()
rewound = true
}
return rewound, nil
}
// notifyBlock notifies listeners based on the block filter. It writes back to
// the outPoints argument the updated list of outpoints to monitor based on
// matched addresses.
func (ro *rescanOptions) notifyBlock(block *btcutil.Block) ([]*btcutil.Tx,
error) {
var relevantTxs []*btcutil.Tx
blockHeader := block.MsgBlock().Header
details := btcjson.BlockDetails{
Height: block.Height(),
Hash: block.Hash().String(),
Time: blockHeader.Timestamp.Unix(),
}
for txIdx, tx := range block.Transactions() {
relevant := false
txDetails := details
txDetails.Index = txIdx
for _, hash := range ro.watchTxIDs {
if hash == *(tx.Hash()) {
relevant = true
break
}
}
for _, in := range tx.MsgTx().TxIn {
if relevant {
break
}
for _, op := range ro.watchOutPoints {
if in.PreviousOutPoint == op {
relevant = true
if ro.ntfn.OnRedeemingTx != nil {
ro.ntfn.OnRedeemingTx(tx,
&txDetails)
}
break
}
}
}
for outIdx, out := range tx.MsgTx().TxOut {
pushedData, err := txscript.PushedData(out.PkScript)
if err != nil {
continue
}
for _, addr := range ro.watchAddrs {
if relevant {
break
}
for _, data := range pushedData {
if bytes.Equal(data,
addr.ScriptAddress()) {
relevant = true
hash := tx.Hash()
outPoint := wire.OutPoint{
Hash: *hash,
Index: uint32(outIdx),
}
ro.watchOutPoints = append(
ro.watchOutPoints,
outPoint)
ro.watchList = append(
ro.watchList,
builder.OutPointToFilterEntry(
outPoint))
if ro.ntfn.OnRecvTx != nil {
ro.ntfn.OnRecvTx(tx,
&txDetails)
}
}
}
}
}
if relevant {
relevantTxs = append(relevantTxs, tx)
}
}
return relevantTxs, nil
}
// Rescan is an object that represents a long-running rescan/notification
// client with updateable filters. It's meant to be close to a drop-in
// replacement for the btcd rescan and notification functionality used in
// wallets. It only contains information about whether a goroutine is running.
type Rescan struct {
running uint32
updateChan chan<- *updateOptions
}
// NewRescan returns a rescan object that runs in another goroutine and has an
// updateable filter. It returns the long-running rescan object, and a channel
// which returns any error on termination of the rescan process.
func (s *ChainService) NewRescan(options ...RescanOption) (Rescan,
<-chan error) {
updChan := make(chan *updateOptions)
errChan := make(chan error)
rescan := Rescan{
running: 1,
updateChan: updChan,
}
go func() {
err := s.Rescan(append(options, updateChan(updChan))...)
atomic.StoreUint32(&rescan.running, 0)
errChan <- err
}()
return rescan, errChan
}
// Functional parameters for Update.
type updateOptions struct {
addrs []btcutil.Address
outPoints []wire.OutPoint
txIDs []chainhash.Hash
rewind uint32
}
// UpdateOption is a functional option argument for the Rescan.Update method.
type UpdateOption func(uo *updateOptions)
func defaultUpdateOptions() *updateOptions {
return &updateOptions{}
}
// AddAddrs adds addresses to the filter.
func AddAddrs(addrs ...btcutil.Address) UpdateOption {
return func(uo *updateOptions) {
uo.addrs = append(uo.addrs, addrs...)
}
}
// AddOutPoints adds outpoints to the filter.
func AddOutPoints(outPoints ...wire.OutPoint) UpdateOption {
return func(uo *updateOptions) {
uo.outPoints = append(uo.outPoints, outPoints...)
}
}
// AddTxIDs adds TxIDs to the filter.
func AddTxIDs(txIDs ...chainhash.Hash) UpdateOption {
return func(uo *updateOptions) {
uo.txIDs = append(uo.txIDs, txIDs...)
}
}
// Rewind rewinds the rescan to the specified height (meaning, disconnects down
// to the block immediately after the specified height) and restarts it from
// that point with the (possibly) newly expanded filter. Especially useful when
// called in the same Update() as one of the previous three options.
func Rewind(height uint32) UpdateOption {
return func(uo *updateOptions) {
uo.rewind = height
}
}
// Update sends an update to a long-running rescan/notification goroutine.
func (r *Rescan) Update(options ...UpdateOption) error {
running := atomic.LoadUint32(&r.running)
if running != 1 {
return fmt.Errorf("Rescan is already done and cannot be " +
"updated.")
}
uo := defaultUpdateOptions()
for _, option := range options {
option(uo)
}
r.updateChan <- uo
return nil
}
// GetUtxo gets the appropriate TxOut or errors if it's spent. The option
// WatchOutPoints (with a single outpoint) is required. StartBlock can be used
// to give a hint about which block the transaction is in, and TxIdx can be used
// to give a hint of which transaction in the block matches it (coinbase is 0,
// first normal transaction is 1, etc.).
func (s *ChainService) GetUtxo(options ...RescanOption) (*wire.TxOut,
*wire.MsgTx, error) {
ro := defaultRescanOptions()
ro.startBlock = &waddrmgr.BlockStamp{
Hash: *s.chainParams.GenesisHash,
Height: 0,
}
for _, option := range options {
option(ro)
}
if len(ro.watchOutPoints) != 1 {
return nil, nil, fmt.Errorf("Must pass exactly one OutPoint.")
}
watchList := [][]byte{
builder.OutPointToFilterEntry(ro.watchOutPoints[0]),
ro.watchOutPoints[0].Hash[:],
}
// Track our position in the chain.
curHeader, curHeight, err := s.LatestBlock()
curStamp := &waddrmgr.BlockStamp{
Hash: curHeader.BlockHash(),
Height: int32(curHeight),
}
if err != nil {
return nil, nil, err
}
// Find our earliest possible block.
if (ro.startBlock.Hash != chainhash.Hash{}) {
_, height, err := s.GetBlockByHash(ro.startBlock.Hash)
if err == nil {
ro.startBlock.Height = int32(height)
} else {
ro.startBlock.Hash = chainhash.Hash{}
}
}
if (ro.startBlock.Hash == chainhash.Hash{}) {
if ro.startBlock.Height == 0 {
ro.startBlock.Hash = *s.chainParams.GenesisHash
} else {
header, err := s.GetBlockByHeight(
uint32(ro.startBlock.Height))
if err == nil {
ro.startBlock.Hash = header.BlockHash()
} else {
ro.startBlock.Hash = *s.chainParams.GenesisHash
ro.startBlock.Height = 0
}
}
}
log.Tracef("Starting scan for output spend from known block %d (%s) "+
"back to block %d (%s)", curStamp.Height, curStamp.Hash,
ro.startBlock.Height, ro.startBlock.Hash)
for {
// Check the basic filter for the spend and the extended filter
// for the transaction in which the outpout is funded.
filter, err := s.GetCFilter(curStamp.Hash, false,
ro.queryOptions...)
if err != nil {
return nil, nil, fmt.Errorf("Couldn't get basic "+
"filter for block %d (%s)", curStamp.Height,
curStamp.Hash)
}
matched := false
if filter != nil {
matched, err = filter.MatchAny(builder.DeriveKey(
&curStamp.Hash), watchList)
}
if err != nil {
return nil, nil, err
}
if !matched {
filter, err = s.GetCFilter(curStamp.Hash, true,
ro.queryOptions...)
if err != nil {
return nil, nil, fmt.Errorf("Couldn't get "+
"extended filter for block %d (%s)",
curStamp.Height, curStamp.Hash)
}
if filter != nil {
matched, err = filter.MatchAny(
builder.DeriveKey(&curStamp.Hash),
watchList)
}
}
// If either is matched, download the block and check to see
// what we have.
if matched {
block, err := s.GetBlockFromNetwork(curStamp.Hash,
ro.queryOptions...)
if err != nil {
return nil, nil, err
}
if block == nil {
return nil, nil, fmt.Errorf("Couldn't get "+
"block %d (%s)", curStamp.Height,
curStamp.Hash)
}
// If we've spent the output in this block, return an
// error stating that the output is spent.
for _, tx := range block.Transactions() {
for _, ti := range tx.MsgTx().TxIn {
if ti.PreviousOutPoint ==
ro.watchOutPoints[0] {
return nil, tx.MsgTx(), nil
}
}
}
// If we found the transaction that created the output,
// then it's not spent and we can return the TxOut.
for _, tx := range block.Transactions() {
if *(tx.Hash()) ==
ro.watchOutPoints[0].Hash {
return tx.MsgTx().
TxOut[ro.watchOutPoints[0].
Index], nil, nil
}
}
}
// Otherwise, iterate backwards until we've gone too
// far.
curStamp.Height--
if curStamp.Height < ro.startBlock.Height {
return nil, nil, fmt.Errorf("Couldn't find "+
"transaction %s",
ro.watchOutPoints[0].Hash)
}
header, err := s.GetBlockByHeight(
uint32(curStamp.Height))
if err != nil {
return nil, nil, err
}
curStamp.Hash = header.BlockHash()
}
}

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -1,39 +0,0 @@
package spvsvc
import "github.com/btcsuite/btcwallet/spvsvc/spvchain"
// SynchronizationService provides an SPV, p2p-based backend for a wallet to
// synchronize it with the network and send transactions it signs.
type SynchronizationService struct {
chain spvchain.ChainService
}
// SynchronizationServiceOpt is the return type of functional options for
// creating a SynchronizationService object.
type SynchronizationServiceOpt func(*SynchronizationService) error
// NewSynchronizationService creates a new SynchronizationService with
// functional options.
func NewSynchronizationService(opts ...SynchronizationServiceOpt) (*SynchronizationService, error) {
s := SynchronizationService{
//userAgentName: defaultUserAgentName,
//userAgentVersion: defaultUserAgentVersion,
}
for _, opt := range opts {
err := opt(&s)
if err != nil {
return nil, err
}
}
return &s, nil
}
// UserAgent is a functional option to set the user agent information as it
// appears to other nodes.
func UserAgent(agentName, agentVersion string) SynchronizationServiceOpt {
return func(s *SynchronizationService) error {
//s.userAgentName = agentName
//s.userAgentVersion = agentVersion
return nil
}
}