SPV implementation - hodgepodge mishmash not done yet

This commit is contained in:
Alex 2017-02-13 16:00:02 -07:00 committed by Olaoluwa Osuntokun
parent d27d1211c5
commit 68e02afbf4
9 changed files with 3912 additions and 0 deletions

26
spvsvc/log.go Normal file
View file

@ -0,0 +1,26 @@
package spvsvc
import "github.com/btcsuite/btclog"
// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log btclog.Logger
// The default amount of logging is none.
func init() {
DisableLog()
}
// DisableLog disables all library log output. Logging output is disabled
// by default until either UseLogger or SetLogWriter are called.
func DisableLog() {
log = btclog.Disabled
}
// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}

View file

@ -0,0 +1,76 @@
package spvchain
import (
"sync"
"time"
"github.com/btcsuite/btclog"
"github.com/btcsuite/btcutil"
)
// blockProgressLogger provides periodic logging for other services in order
// to show users progress of certain "actions" involving some or all current
// blocks. Ex: syncing to best chain, indexing all blocks, etc.
type blockProgressLogger struct {
receivedLogBlocks int64
receivedLogTx int64
lastBlockLogTime time.Time
subsystemLogger btclog.Logger
progressAction string
sync.Mutex
}
// newBlockProgressLogger returns a new block progress logger.
// The progress message is templated as follows:
// {progressAction} {numProcessed} {blocks|block} in the last {timePeriod}
// ({numTxs}, height {lastBlockHeight}, {lastBlockTimeStamp})
func newBlockProgressLogger(progressMessage string, logger btclog.Logger) *blockProgressLogger {
return &blockProgressLogger{
lastBlockLogTime: time.Now(),
progressAction: progressMessage,
subsystemLogger: logger,
}
}
// LogBlockHeight logs a new block height as an information message to show
// progress to the user. In order to prevent spam, it limits logging to one
// message every 10 seconds with duration and totals included.
func (b *blockProgressLogger) LogBlockHeight(block *btcutil.Block) {
b.Lock()
defer b.Unlock()
b.receivedLogBlocks++
b.receivedLogTx += int64(len(block.MsgBlock().Transactions))
now := time.Now()
duration := now.Sub(b.lastBlockLogTime)
if duration < time.Second*10 {
return
}
// Truncate the duration to 10s of milliseconds.
durationMillis := int64(duration / time.Millisecond)
tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10)
// Log information about new block height.
blockStr := "blocks"
if b.receivedLogBlocks == 1 {
blockStr = "block"
}
txStr := "transactions"
if b.receivedLogTx == 1 {
txStr = "transaction"
}
b.subsystemLogger.Infof("%s %d %s in the last %s (%d %s, height %d, %s)",
b.progressAction, b.receivedLogBlocks, blockStr, tDuration, b.receivedLogTx,
txStr, block.Height(), block.MsgBlock().Header.Timestamp)
b.receivedLogBlocks = 0
b.receivedLogTx = 0
b.lastBlockLogTime = now
}
func (b *blockProgressLogger) SetLastLogTime(time time.Time) {
b.lastBlockLogTime = time
}

File diff suppressed because it is too large Load diff

429
spvsvc/spvchain/db.go Normal file
View file

@ -0,0 +1,429 @@
package spvchain
import (
"bytes"
"encoding/binary"
"time"
"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil/gcs"
"github.com/btcsuite/btcwallet/waddrmgr"
"github.com/btcsuite/btcwallet/walletdb"
)
const (
// LatestDBVersion is the most recent database version.
LatestDBVersion = 1
)
var (
// latestDBVersion is the most recent database version as a variable so
// the tests can change it to force errors.
latestDBVersion uint32 = LatestDBVersion
)
// Key names for various database fields.
var (
// Bucket names.
spvBucketName = []byte("spv")
blockHeaderBucketName = []byte("bh")
basicHeaderBucketName = []byte("bfh")
basicFilterBucketName = []byte("bf")
extHeaderBucketName = []byte("efh")
extFilterBucketName = []byte("ef")
// Db related key names (main bucket).
dbVersionName = []byte("dbver")
dbCreateDateName = []byte("dbcreated")
maxBlockHeightName = []byte("maxblockheight")
)
// uint32ToBytes converts a 32 bit unsigned integer into a 4-byte slice in
// little-endian order: 1 -> [1 0 0 0].
func uint32ToBytes(number uint32) []byte {
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, number)
return buf
}
// uint64ToBytes converts a 64 bit unsigned integer into a 8-byte slice in
// little-endian order: 1 -> [1 0 0 0 0 0 0 0].
func uint64ToBytes(number uint64) []byte {
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, number)
return buf
}
// fetchDBVersion fetches the current manager version from the database.
func fetchDBVersion(tx walletdb.Tx) (uint32, error) {
bucket := tx.RootBucket().Bucket(spvBucketName)
verBytes := bucket.Get(dbVersionName)
if verBytes == nil {
str := "required version number not stored in database"
return 0, log.Error(str)
}
version := binary.LittleEndian.Uint32(verBytes)
return version, nil
}
// putDBVersion stores the provided version to the database.
func putDBVersion(tx walletdb.Tx, version uint32) error {
bucket := tx.RootBucket().Bucket(spvBucketName)
verBytes := uint32ToBytes(version)
err := bucket.Put(dbVersionName, verBytes)
if err != nil {
str := "failed to store version: %v"
return log.Errorf(str, err)
}
return nil
}
// putMaxBlockHeight stores the max block height to the database.
func putMaxBlockHeight(tx walletdb.Tx, maxBlockHeight uint32) error {
bucket := tx.RootBucket().Bucket(spvBucketName)
maxBlockHeightBytes := uint32ToBytes(maxBlockHeight)
err := bucket.Put(maxBlockHeightName, maxBlockHeightBytes)
if err != nil {
str := "failed to store max block height: %v"
return log.Errorf(str, err)
}
return nil
}
// putBlock stores the provided block header and height, keyed to the block
// hash, in the database.
func putBlock(tx walletdb.Tx, header wire.BlockHeader, height uint32) error {
var buf bytes.Buffer
err := header.Serialize(&buf)
if err != nil {
return err
}
_, err = buf.Write(uint32ToBytes(height))
if err != nil {
return err
}
bucket := tx.RootBucket().Bucket(spvBucketName).Bucket(blockHeaderBucketName)
blockHash := header.BlockHash()
err = bucket.Put(blockHash[:], buf.Bytes())
if err != nil {
str := "failed to store SPV block info: %v"
return log.Errorf(str, err)
}
err = bucket.Put(uint32ToBytes(height), blockHash[:])
if err != nil {
str := "failed to store block height info: %v"
return log.Errorf(str, err)
}
return nil
}
// putFilter stores the provided filter, keyed to the block hash, in the
// appropriate filter bucket in the database.
func putFilter(tx walletdb.Tx, blockHash chainhash.Hash, bucketName []byte,
filter *gcs.Filter) error {
var buf bytes.Buffer
_, err := buf.Write(filter.NBytes())
if err != nil {
return err
}
bucket := tx.RootBucket().Bucket(spvBucketName).Bucket(bucketName)
err = bucket.Put(blockHash[:], buf.Bytes())
if err != nil {
str := "failed to store filter: %v"
return log.Errorf(str, err)
}
return nil
}
// putBasicFilter stores the provided filter, keyed to the block hash, in the
// basic filter bucket in the database.
func putBasicFilter(tx walletdb.Tx, blockHash chainhash.Hash,
filter *gcs.Filter) error {
return putFilter(tx, blockHash, basicFilterBucketName, filter)
}
// putExtFilter stores the provided filter, keyed to the block hash, in the
// extended filter bucket in the database.
func putExtFilter(tx walletdb.Tx, blockHash chainhash.Hash,
filter *gcs.Filter) error {
return putFilter(tx, blockHash, extFilterBucketName, filter)
}
// putHeader stores the provided filter, keyed to the block hash, in the
// appropriate filter bucket in the database.
func putHeader(tx walletdb.Tx, blockHash chainhash.Hash, bucketName []byte,
filterTip chainhash.Hash) error {
bucket := tx.RootBucket().Bucket(spvBucketName).Bucket(bucketName)
err := bucket.Put(blockHash[:], filterTip[:])
if err != nil {
str := "failed to store filter header: %v"
return log.Errorf(str, err)
}
return nil
}
// putBasicHeader stores the provided filter, keyed to the block hash, in the
// basic filter bucket in the database.
func putBasicHeader(tx walletdb.Tx, blockHash chainhash.Hash,
filterTip chainhash.Hash) error {
return putHeader(tx, blockHash, basicHeaderBucketName, filterTip)
}
// putExtHeader stores the provided filter, keyed to the block hash, in the
// extended filter bucket in the database.
func putExtHeader(tx walletdb.Tx, blockHash chainhash.Hash,
filterTip chainhash.Hash) error {
return putHeader(tx, blockHash, extHeaderBucketName, filterTip)
}
// GetBlockByHash retrieves the block header, filter, and filter tip, based on
// the provided block hash, from the database.
func GetBlockByHash(tx walletdb.Tx, blockHash chainhash.Hash) (wire.BlockHeader,
uint32, error) {
//chainhash.Hash, chainhash.Hash,
bucket := tx.RootBucket().Bucket(spvBucketName).Bucket(blockHeaderBucketName)
blockBytes := bucket.Get(blockHash[:])
if len(blockBytes) == 0 {
str := "failed to retrieve block info for hash: %s"
return wire.BlockHeader{}, 0, log.Errorf(str, blockHash)
}
buf := bytes.NewReader(blockBytes[:wire.MaxBlockHeaderPayload])
var header wire.BlockHeader
err := header.Deserialize(buf)
if err != nil {
str := "failed to deserialize block header for hash: %s"
return wire.BlockHeader{}, 0, log.Errorf(str, blockHash)
}
height := binary.LittleEndian.Uint32(blockBytes[wire.MaxBlockHeaderPayload : wire.MaxBlockHeaderPayload+4])
return header, height, nil
}
// GetBlockHashByHeight retrieves the hash of a block by its height.
func GetBlockHashByHeight(tx walletdb.Tx, height uint32) (chainhash.Hash,
error) {
bucket := tx.RootBucket().Bucket(spvBucketName).Bucket(blockHeaderBucketName)
var hash chainhash.Hash
hashBytes := bucket.Get(uint32ToBytes(height))
if hashBytes == nil {
str := "no block hash for height %v"
return hash, log.Errorf(str, height)
}
hash.SetBytes(hashBytes)
return hash, nil
}
// GetBlockByHeight retrieves a block's information by its height.
func GetBlockByHeight(tx walletdb.Tx, height uint32) (wire.BlockHeader, uint32,
error) {
// chainhash.Hash, chainhash.Hash
blockHash, err := GetBlockHashByHeight(tx, height)
if err != nil {
return wire.BlockHeader{}, 0, err
}
return GetBlockByHash(tx, blockHash)
}
// SyncedTo retrieves the most recent block's height and hash.
func SyncedTo(tx walletdb.Tx) (*waddrmgr.BlockStamp, error) {
header, height, err := LatestBlock(tx)
if err != nil {
return nil, err
}
var blockStamp waddrmgr.BlockStamp
blockStamp.Hash = header.BlockHash()
blockStamp.Height = int32(height)
return &blockStamp, nil
}
// LatestBlock retrieves all the info about the latest stored block.
func LatestBlock(tx walletdb.Tx) (wire.BlockHeader, uint32, error) {
bucket := tx.RootBucket().Bucket(spvBucketName)
maxBlockHeightBytes := bucket.Get(maxBlockHeightName)
if maxBlockHeightBytes == nil {
str := "no max block height stored"
return wire.BlockHeader{}, 0, log.Error(str)
}
maxBlockHeight := binary.LittleEndian.Uint32(maxBlockHeightBytes)
header, height, err := GetBlockByHeight(tx, maxBlockHeight)
if err != nil {
return wire.BlockHeader{}, 0, err
}
if height != maxBlockHeight {
str := "max block height inconsistent"
return wire.BlockHeader{}, 0, log.Error(str)
}
return header, height, nil
}
// BlockLocatorFromHash returns a block locator based on the provided hash.
func BlockLocatorFromHash(tx walletdb.Tx, hash chainhash.Hash) blockchain.BlockLocator {
locator := make(blockchain.BlockLocator, 0, wire.MaxBlockLocatorsPerMsg)
locator = append(locator, &hash)
// If hash isn't found in DB or this is the genesis block, return
// the locator as is
_, height, err := GetBlockByHash(tx, hash)
if (err != nil) || (height == 0) {
return locator
}
decrement := uint32(1)
for (height > 0) && (len(locator) < wire.MaxBlockLocatorsPerMsg) {
// Decrement by 1 for the first 10 blocks, then double the
// jump until we get to the genesis hash
if len(locator) > 10 {
decrement *= 2
}
if decrement > height {
height = 0
} else {
height -= decrement
}
blockHash, err := GetBlockHashByHeight(tx, height)
if err != nil {
return locator
}
locator = append(locator, &blockHash)
}
return locator
}
// createSPVNS creates the initial namespace structure needed for all of the
// SPV-related data. This includes things such as all of the buckets as well as
// the version and creation date.
func createSPVNS(namespace walletdb.Namespace, params *chaincfg.Params) error {
err := namespace.Update(func(tx walletdb.Tx) error {
rootBucket := tx.RootBucket()
spvBucket, err := rootBucket.CreateBucketIfNotExists(spvBucketName)
if err != nil {
str := "failed to create main bucket: %v"
return log.Errorf(str, err)
}
_, err = spvBucket.CreateBucketIfNotExists(blockHeaderBucketName)
if err != nil {
str := "failed to create block header bucket: %v"
return log.Errorf(str, err)
}
_, err = spvBucket.CreateBucketIfNotExists(basicFilterBucketName)
if err != nil {
str := "failed to create basic filter bucket: %v"
return log.Errorf(str, err)
}
_, err = spvBucket.CreateBucketIfNotExists(basicHeaderBucketName)
if err != nil {
str := "failed to create basic header bucket: %v"
return log.Errorf(str, err)
}
_, err = spvBucket.CreateBucketIfNotExists(extFilterBucketName)
if err != nil {
str := "failed to create extended filter bucket: %v"
return log.Errorf(str, err)
}
_, err = spvBucket.CreateBucketIfNotExists(extHeaderBucketName)
if err != nil {
str := "failed to create extended header bucket: %v"
return log.Errorf(str, err)
}
createDate := spvBucket.Get(dbCreateDateName)
if createDate != nil {
log.Infof("Wallet SPV namespace already created.")
return nil
}
log.Infof("Creating wallet SPV namespace.")
basicFilter, err := buildBasicFilter(params.GenesisBlock)
if err != nil {
return err
}
basicFilterTip := makeHeaderForFilter(basicFilter,
params.GenesisBlock.Header.PrevBlock)
extFilter, err := buildExtFilter(params.GenesisBlock)
if err != nil {
return err
}
extFilterTip := makeHeaderForFilter(extFilter,
params.GenesisBlock.Header.PrevBlock)
err = putBlock(tx, params.GenesisBlock.Header, 0)
if err != nil {
return err
}
err = putBasicFilter(tx, *params.GenesisHash, basicFilter)
if err != nil {
return err
}
err = putBasicHeader(tx, *params.GenesisHash, basicFilterTip)
if err != nil {
return err
}
err = putExtFilter(tx, *params.GenesisHash, extFilter)
if err != nil {
return err
}
err = putExtHeader(tx, *params.GenesisHash, extFilterTip)
if err != nil {
return err
}
err = putDBVersion(tx, latestDBVersion)
if err != nil {
return err
}
err = putMaxBlockHeight(tx, 0)
if err != nil {
return err
}
err = spvBucket.Put(dbCreateDateName,
uint64ToBytes(uint64(time.Now().Unix())))
if err != nil {
str := "failed to store database creation time: %v"
return log.Errorf(str, err)
}
return nil
})
if err != nil {
str := "failed to update database: %v"
return log.Errorf(str, err)
}
return nil
}

73
spvsvc/spvchain/filter.go Normal file
View file

@ -0,0 +1,73 @@
package spvchain
import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil/gcs"
"github.com/btcsuite/btcutil/gcs/builder"
)
func buildBasicFilter(block *wire.MsgBlock) (*gcs.Filter, error) {
blockHash := block.BlockHash()
b := builder.WithKeyHash(&blockHash)
_, err := b.Key()
if err != nil {
str := "failed to create filter builder: %v"
return nil, log.Errorf(str, err)
}
for i, tx := range block.Transactions {
// Skip the inputs for the coinbase transaction
if i != 0 {
for _, txIn := range tx.TxIn {
b.AddOutPoint(txIn.PreviousOutPoint)
}
}
for _, txOut := range tx.TxOut {
b.AddScript(txOut.PkScript)
}
}
f, err := b.Build()
if err != nil {
str := "failed to build filter: %v"
return nil, log.Errorf(str, err)
}
return f, nil
}
func buildExtFilter(block *wire.MsgBlock) (*gcs.Filter, error) {
blockHash := block.BlockHash()
b := builder.WithKeyHash(&blockHash)
_, err := b.Key()
if err != nil {
str := "failed to create filter builder: %v"
return nil, log.Errorf(str, err)
}
for i, tx := range block.Transactions {
txHash := tx.TxHash()
b.AddHash(&txHash)
// Skip the inputs for the coinbase transaction
if i != 0 {
for _, txIn := range tx.TxIn {
b.AddScript(txIn.SignatureScript)
}
}
}
f, err := b.Build()
if err != nil {
str := "failed to build filter: %v"
return nil, log.Errorf(str, err)
}
return f, nil
}
func getFilterHash(filter *gcs.Filter) chainhash.Hash {
return chainhash.HashH(filter.NBytes())
}
func makeHeaderForFilter(filter *gcs.Filter, prevHeader chainhash.Hash) chainhash.Hash {
filterTip := make([]byte, 2*chainhash.HashSize)
filterHash := getFilterHash(filter)
copy(filterTip, filterHash[:])
copy(filterTip[chainhash.HashSize:], prevHeader[:])
return chainhash.HashH(filterTip)
}

26
spvsvc/spvchain/log.go Normal file
View file

@ -0,0 +1,26 @@
package spvchain
import "github.com/btcsuite/btclog"
// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log btclog.Logger
// The default amount of logging is none.
func init() {
DisableLog()
}
// DisableLog disables all library log output. Logging output is disabled
// by default until either UseLogger or SetLogWriter are called.
func DisableLog() {
log = btclog.Disabled
}
// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}

View file

@ -0,0 +1,149 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package spvchain
import (
"errors"
"github.com/btcsuite/btcd/addrmgr"
"github.com/btcsuite/btcd/connmgr"
)
type getConnCountMsg struct {
reply chan int32
}
type getPeersMsg struct {
reply chan []*serverPeer
}
type getOutboundGroup struct {
key string
reply chan int
}
type getAddedNodesMsg struct {
reply chan []*serverPeer
}
type disconnectNodeMsg struct {
cmp func(*serverPeer) bool
reply chan error
}
type connectNodeMsg struct {
addr string
permanent bool
reply chan error
}
type removeNodeMsg struct {
cmp func(*serverPeer) bool
reply chan error
}
// handleQuery is the central handler for all queries and commands from other
// goroutines related to peer state.
func (s *ChainService) handleQuery(state *peerState, querymsg interface{}) {
switch msg := querymsg.(type) {
case getConnCountMsg:
nconnected := int32(0)
state.forAllPeers(func(sp *serverPeer) {
if sp.Connected() {
nconnected++
}
})
msg.reply <- nconnected
case getPeersMsg:
peers := make([]*serverPeer, 0, state.Count())
state.forAllPeers(func(sp *serverPeer) {
if !sp.Connected() {
return
}
peers = append(peers, sp)
})
msg.reply <- peers
case connectNodeMsg:
// TODO: duplicate oneshots?
// Limit max number of total peers.
if state.Count() >= MaxPeers {
msg.reply <- errors.New("max peers reached")
return
}
for _, peer := range state.persistentPeers {
if peer.Addr() == msg.addr {
if msg.permanent {
msg.reply <- errors.New("peer already connected")
} else {
msg.reply <- errors.New("peer exists as a permanent peer")
}
return
}
}
netAddr, err := addrStringToNetAddr(msg.addr)
if err != nil {
msg.reply <- err
return
}
// TODO: if too many, nuke a non-perm peer.
go s.connManager.Connect(&connmgr.ConnReq{
Addr: netAddr,
Permanent: msg.permanent,
})
msg.reply <- nil
case removeNodeMsg:
found := disconnectPeer(state.persistentPeers, msg.cmp, func(sp *serverPeer) {
// Keep group counts ok since we remove from
// the list now.
state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
})
if found {
msg.reply <- nil
} else {
msg.reply <- errors.New("peer not found")
}
case getOutboundGroup:
count, ok := state.outboundGroups[msg.key]
if ok {
msg.reply <- count
} else {
msg.reply <- 0
}
// Request a list of the persistent (added) peers.
case getAddedNodesMsg:
// Respond with a slice of the relavent peers.
peers := make([]*serverPeer, 0, len(state.persistentPeers))
for _, sp := range state.persistentPeers {
peers = append(peers, sp)
}
msg.reply <- peers
case disconnectNodeMsg:
// Check outbound peers.
found := disconnectPeer(state.outboundPeers, msg.cmp, func(sp *serverPeer) {
// Keep group counts ok since we remove from
// the list now.
state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
})
if found {
// If there are multiple outbound connections to the same
// ip:port, continue disconnecting them all until no such
// peers are found.
for found {
found = disconnectPeer(state.outboundPeers, msg.cmp, func(sp *serverPeer) {
state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
})
}
msg.reply <- nil
return
}
msg.reply <- errors.New("peer not found")
}
}

1373
spvsvc/spvchain/spvchain.go Normal file

File diff suppressed because it is too large Load diff

277
spvsvc/spvsvc.go Normal file
View file

@ -0,0 +1,277 @@
package spvsvc
import (
"fmt"
"net"
"time"
"github.com/btcsuite/btcd/addrmgr"
"github.com/btcsuite/btcd/connmgr"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/spvsvc/spvchain"
"github.com/btcsuite/btcwallet/wallet"
)
// SynchronizationService provides an SPV, p2p-based backend for a wallet to
// synchronize it with the network and send transactions it signs.
type SynchronizationService struct {
wallet *wallet.Wallet
chainService spvchain.ChainService
}
// SynchronizationServiceOpt is the return type of functional options for
// creating a SynchronizationService object.
type SynchronizationServiceOpt func(*SynchronizationService) error
// NewSynchronizationService creates a new SynchronizationService with
// functional options.
func NewSynchronizationService(opts ...SynchronizationServiceOpt) (*SynchronizationService, error) {
s := SynchronizationService{
userAgentName: defaultUserAgentName,
userAgentVersion: defaultUserAgentVersion,
}
for _, opt := range opts {
err := opt(&s)
if err != nil {
return nil, err
}
}
return &s, nil
}
// UserAgent is a functional option to set the user agent information as it
// appears to other nodes.
func UserAgent(agentName, agentVersion string) SynchronizationServiceOpt {
return func(s *SynchronizationService) error {
s.userAgentName = agentName
s.userAgentVersion = agentVersion
return nil
}
}
// AddrManager is a functional option to create an address manager for the
// synchronization service. It takes a string as an argument to specify the
// directory in which to store addresses.
func AddrManager(dir string) SynchronizationServiceOpt {
return func(s *SynchronizationService) error {
m := addrmgr.New(dir, spvLookup)
s.addrManager = m
return nil
}
}
// ConnManagerOpt is the return type of functional options to create a
// connection manager for the synchronization service.
type ConnManagerOpt func(*connmgr.Config) error
// ConnManager is a functional option to create a connection manager for the
// synchronization service.
func ConnManager(opts ...ConnManagerOpt) SynchronizationServiceOpt {
return func(s *SynchronizationService) error {
c := connmgr.Config{
TargetOutbound: defaultTargetOutbound,
RetryDuration: connectionRetryInterval,
GetNewAddress: s.getNewAddress,
}
for _, opt := range opts {
err := opt(&c)
if err != nil {
return err
}
}
connManager, err := connmgr.New(&c)
if err != nil {
return err
}
s.connManager = connManager
return nil
}
}
// TargetOutbound is a functional option to specify how many outbound
// connections should be made by the ConnManager to peers. Defaults to 8.
func TargetOutbound(target uint32) ConnManagerOpt {
return func(c *connmgr.Config) error {
c.TargetOutbound = target
return nil
}
}
// RetryDuration is a functional option to specify how long to wait before
// retrying a connection request. Defaults to 5s.
func RetryDuration(duration time.Duration) ConnManagerOpt {
return func(c *connmgr.Config) error {
c.RetryDuration = duration
return nil
}
}
func (s *SynchronizationService) getNewAddress() (net.Addr, error) {
if s.addrManager == nil {
return nil, log.Error("Couldn't get address for new " +
"connection: address manager is nil.")
}
s.addrManager.Start()
for tries := 0; tries < 100; tries++ {
addr := s.addrManager.GetAddress()
if addr == nil {
break
}
// If we already have peers in this group, skip this address
key := addrmgr.GroupKey(addr.NetAddress())
if s.outboundGroupCount(key) != 0 {
continue
}
if tries < 30 && time.Since(addr.LastAttempt()) < 10*time.Minute {
continue
}
if tries < 50 && fmt.Sprintf("%d", addr.NetAddress().Port) !=
s.wallet.ChainParams().DefaultPort {
continue
}
addrString := addrmgr.NetAddressKey(addr.NetAddress())
return addrStringToNetAddr(addrString)
}
return nil, log.Error("Couldn't get address for new connection: no " +
"valid addresses known.")
}
func (s *SynchronizationService) outboundGroupCount(key string) int {
replyChan := make(chan int)
s.query <- getOutboundGroup{key: key, reply: replyChan}
return <-replyChan
}
// SynchronizeWallet associates a wallet with the consensus RPC client,
// synchronizes the wallet with the latest changes to the blockchain, and
// continuously updates the wallet through RPC notifications.
//
// This function does not return without error until the wallet is synchronized
// to the current chain state.
func (s *SynchronizationService) SynchronizeWallet(w *wallet.Wallet) error {
s.wallet = w
s.wg.Add(3)
go s.notificationQueueHandler()
go s.processQueuedNotifications()
go s.queryHandler()
return s.syncWithNetwork(w)
}
func (s *SynchronizationService) queryHandler() {
}
func (s *SynchronizationService) processQueuedNotifications() {
for n := range s.dequeueNotification {
var err error
notificationSwitch:
switch n := n.(type) {
case *wire.MsgBlock:
if n.BlockHash().String() != "" {
break notificationSwitch
}
case *wire.MsgHeaders:
case *wire.MsgInv:
case *wire.MsgReject:
}
if err != nil {
log.Errorf("Cannot handle peer notification: %v", err)
}
}
s.wg.Done()
}
// syncWithNetwork brings the wallet up to date with the current chain server
// connection. It creates a rescan request and blocks until the rescan has
// finished.
func (s *SynchronizationService) syncWithNetwork(w *wallet.Wallet) error {
/*chainClient := s.rpcClient
// Request notifications for connected and disconnected blocks.
//
// TODO(jrick): Either request this notification only once, or when
// btcrpcclient is modified to allow some notification request to not
// automatically resent on reconnect, include the notifyblocks request
// as well. I am leaning towards allowing off all btcrpcclient
// notification re-registrations, in which case the code here should be
// left as is.
err := chainClient.NotifyBlocks()
if err != nil {
return err
}
// Request notifications for transactions sending to all wallet
// addresses.
addrs, unspent, err := w.ActiveData()
if err != nil {
return err
}
// TODO(jrick): How should this handle a synced height earlier than
// the chain server best block?
// When no addresses have been generated for the wallet, the rescan can
// be skipped.
//
// TODO: This is only correct because activeData above returns all
// addresses ever created, including those that don't need to be watched
// anymore. This code should be updated when this assumption is no
// longer true, but worst case would result in an unnecessary rescan.
if len(addrs) == 0 && len(unspent) == 0 {
// TODO: It would be ideal if on initial sync wallet saved the
// last several recent blocks rather than just one. This would
// avoid a full rescan for a one block reorg of the current
// chain tip.
hash, height, err := chainClient.GetBestBlock()
if err != nil {
return err
}
return w.Manager.SetSyncedTo(&waddrmgr.BlockStamp{
Hash: *hash,
Height: height,
})
}
// Compare previously-seen blocks against the chain server. If any of
// these blocks no longer exist, rollback all of the missing blocks
// before catching up with the rescan.
iter := w.Manager.NewIterateRecentBlocks()
rollback := iter == nil
syncBlock := waddrmgr.BlockStamp{
Hash: *w.ChainParams().GenesisHash,
Height: 0,
}
for cont := iter != nil; cont; cont = iter.Prev() {
bs := iter.BlockStamp()
log.Debugf("Checking for previous saved block with height %v hash %v",
bs.Height, bs.Hash)
_, err = chainClient.GetBlock(&bs.Hash)
if err != nil {
rollback = true
continue
}
log.Debug("Found matching block.")
syncBlock = bs
break
}
if rollback {
err = w.Manager.SetSyncedTo(&syncBlock)
if err != nil {
return err
}
// Rollback unconfirms transactions at and beyond the passed
// height, so add one to the new synced-to height to prevent
// unconfirming txs from the synced-to block.
err = w.TxStore.Rollback(syncBlock.Height + 1)
if err != nil {
return err
}
}
return s.initialRescan(addrs, unspent, w.Manager.SyncedTo()) */
return nil
}