Support mempool transaction notifications.

This commit is contained in:
Josh Rickmar 2013-12-17 13:18:09 -05:00
parent 399f91bba2
commit 614ed93a1d
6 changed files with 511 additions and 97 deletions

View file

@ -644,6 +644,11 @@ func (a *Account) newBlockTxOutHandler(result interface{}, e *btcjson.Error) boo
log.Error("Tx Handler: Unspecified receiver.")
return false
}
receiverHash, _, err := btcutil.DecodeAddress(receiver)
if err != nil {
log.Errorf("Tx Handler: receiver address can not be decoded: %v", err)
return false
}
height, ok := v["height"].(float64)
if !ok {
log.Error("Tx Handler: Unspecified height.")
@ -654,6 +659,11 @@ func (a *Account) newBlockTxOutHandler(result interface{}, e *btcjson.Error) boo
log.Error("Tx Handler: Unspecified block hash.")
return false
}
blockHash, err := btcwire.NewShaHashFromStr(blockHashBE)
if err != nil {
log.Errorf("Tx Handler: Block hash string cannot be parsed: %v", err)
return false
}
fblockIndex, ok := v["blockindex"].(float64)
if !ok {
log.Error("Tx Handler: Unspecified block index.")
@ -671,12 +681,17 @@ func (a *Account) newBlockTxOutHandler(result interface{}, e *btcjson.Error) boo
log.Error("Tx Handler: Unspecified transaction hash.")
return false
}
txID, err := btcwire.NewShaHashFromStr(txhashBE)
if err != nil {
log.Errorf("Tx Handler: Tx hash string cannot be parsed: %v", err)
return false
}
ftxOutIndex, ok := v["txoutindex"].(float64)
if !ok {
log.Error("Tx Handler: Unspecified transaction output index.")
return false
}
txOutIndex := int32(ftxOutIndex)
txOutIndex := uint32(ftxOutIndex)
amt, ok := v["amount"].(float64)
if !ok {
log.Error("Tx Handler: Unspecified amount.")
@ -693,30 +708,22 @@ func (a *Account) newBlockTxOutHandler(result interface{}, e *btcjson.Error) boo
spent = tspent
}
// btcd sends the block and tx hashes as BE strings. Convert both
// to a LE ShaHash.
blockHash, err := btcwire.NewShaHashFromStr(blockHashBE)
if err != nil {
log.Errorf("Tx Handler: Block hash string cannot be parsed: %v", err)
return false
}
txID, err := btcwire.NewShaHashFromStr(txhashBE)
if err != nil {
log.Errorf("Tx Handler: Tx hash string cannot be parsed: %v", err)
return false
}
receiverHash, _, err := btcutil.DecodeAddress(receiver)
if err != nil {
log.Errorf("Tx Handler: receiver address can not be decoded: %v", err)
return false
if int32(height) != -1 {
worker := NotifyBalanceWorker{
block: *blockHash,
wg: make(chan *sync.WaitGroup),
}
NotifyBalanceSyncerChans.add <- worker
wg := <-worker.wg
defer func() {
wg.Done()
}()
}
// Add to TxStore.
//
// TODO(jrick): check for duplicates. This could occur if we're
// adding txs for an out of sync btcd on its IBD.
// Create RecvTx to add to tx history.
t := &tx.RecvTx{
TxID: *txID,
TxOutIdx: txOutIndex,
TimeReceived: time.Now().Unix(),
BlockHeight: int32(height),
BlockHash: *blockHash,
@ -726,44 +733,49 @@ func (a *Account) newBlockTxOutHandler(result interface{}, e *btcjson.Error) boo
ReceiverHash: receiverHash,
}
// For transactions originating from this wallet, the sent tx history should
// be recorded before the received history. If wallet created this tx, wait
// for the sent history to finish being recorded before continuing.
req := SendTxHistSyncRequest{
txid: *txID,
response: make(chan SendTxHistSyncResponse),
}
SendTxHistSyncChans.access <- req
resp := <-req.response
if resp.ok {
// Wait until send history has been recorded.
<-resp.c
SendTxHistSyncChans.remove <- *txID
}
// Actually record the tx history.
a.TxStore.Lock()
txs := a.TxStore.s
a.TxStore.s = append(txs, t)
a.TxStore.s.InsertRecvTx(t)
a.TxStore.dirty = true
a.TxStore.Unlock()
// Notify frontends of new tx.
NotifyNewTxDetails(frontendNotificationMaster, a.Name(), t.TxInfo(a.Name(),
int32(height), a.Wallet.Net()))
// Notify frontends of tx. If the tx is unconfirmed, it is always
// notified and the outpoint is marked as notified. If the outpoint
// has already been notified and is now in a block, a txmined notifiction
// should be sent once to let frontends that all previous send/recvs
// for this unconfirmed tx are now confirmed.
recvTxOP := btcwire.NewOutPoint(txID, txOutIndex)
previouslyNotifiedReq := NotifiedRecvTxRequest{
op: *recvTxOP,
response: make(chan NotifiedRecvTxResponse),
}
NotifiedRecvTxChans.access <- previouslyNotifiedReq
if <-previouslyNotifiedReq.response {
NotifyMinedTx <- t
NotifiedRecvTxChans.remove <- *recvTxOP
} else {
// Notify frontends of new recv tx and mark as notified.
NotifiedRecvTxChans.add <- *recvTxOP
NotifyNewTxDetails(frontendNotificationMaster, a.Name(), t.TxInfo(a.Name(),
int32(height), a.Wallet.Net()))
}
if !spent {
// First, iterate through all stored utxos. If an unconfirmed utxo
// (not present in a block) has the same outpoint as this utxo,
// update the block height and hash.
a.UtxoStore.RLock()
for _, u := range a.UtxoStore.s {
if bytes.Equal(u.Out.Hash[:], txID[:]) && u.Out.Index == uint32(txOutIndex) {
// Found a either a duplicate, or a change UTXO. If not change,
// ignore it.
a.UtxoStore.RUnlock()
if u.Height != -1 {
return false
}
a.UtxoStore.Lock()
copy(u.BlockHash[:], blockHash[:])
u.Height = int32(height)
a.UtxoStore.dirty = true
a.UtxoStore.Unlock()
return false
}
}
a.UtxoStore.RUnlock()
// After iterating through all UTXOs, it was not a duplicate or
// change UTXO appearing in a block. Append a new Utxo to the end.
u := &tx.Utxo{
Amt: uint64(amt),
Height: int32(height),
@ -774,13 +786,18 @@ func (a *Account) newBlockTxOutHandler(result interface{}, e *btcjson.Error) boo
copy(u.AddrHash[:], receiverHash)
copy(u.BlockHash[:], blockHash[:])
a.UtxoStore.Lock()
a.UtxoStore.s = append(a.UtxoStore.s, u)
a.UtxoStore.s.Insert(u)
a.UtxoStore.dirty = true
a.UtxoStore.Unlock()
// If this notification came from mempool (TODO: currently
// unimplemented) notify the new unconfirmed balance immediately.
// Otherwise, wait until the blockconnection notifiation is processed.
// If this notification came from mempool, notify frontends of
// the new unconfirmed balance immediately. Otherwise, wait until
// the blockconnected notifiation is processed.
if u.Height == -1 {
bal := a.CalculateBalance(0) - a.CalculateBalance(1)
NotifyWalletBalanceUnconfirmed(frontendNotificationMaster,
a.name, bal)
}
}
// Never remove this handler.

13
cmd.go
View file

@ -239,6 +239,19 @@ func main() {
// Begin generating new IDs for JSON calls.
go JSONIDGenerator(NewJSONID)
// Begin maintanence goroutines.
go SendBeforeReceiveHistorySync(SendTxHistSyncChans.add,
SendTxHistSyncChans.done,
SendTxHistSyncChans.remove,
SendTxHistSyncChans.access)
go StoreNotifiedMempoolRecvTxs(NotifiedRecvTxChans.add,
NotifiedRecvTxChans.remove,
NotifiedRecvTxChans.access)
go NotifyMinedTxSender(NotifyMinedTx)
go NotifyBalanceSyncer(NotifyBalanceSyncerChans.add,
NotifyBalanceSyncerChans.remove,
NotifyBalanceSyncerChans.access)
for {
replies := make(chan error)
done := make(chan int)

212
cmdmgr.go
View file

@ -27,6 +27,7 @@ import (
"github.com/conformal/btcwallet/wallet"
"github.com/conformal/btcwire"
"github.com/conformal/btcws"
"sync"
"time"
)
@ -636,7 +637,7 @@ func SendFrom(frontend chan []byte, icmd btcjson.Cmd) {
}
// If a change address was added, mark wallet as dirty, sync to disk,
// and Request updates for change address.
// and request updates for change address.
if len(createdTx.changeAddr) != 0 {
a.dirty = true
if err := a.writeDirtyToDisk(); err != nil {
@ -751,6 +752,10 @@ func SendMany(frontend chan []byte, icmd btcjson.Cmd) {
return
}
// Mark txid as having send history so handlers adding receive history
// wait until all send history has been written.
SendTxHistSyncChans.add <- createdTx.txid
// Set up a reply handler to respond to the btcd reply.
replyHandlers.Lock()
replyHandlers.m[n] = func(result interface{}, err *btcjson.Error) bool {
@ -763,6 +768,61 @@ func SendMany(frontend chan []byte, icmd btcjson.Cmd) {
btcdMsgs <- m
}
// Channels to manage SendBeforeReceiveHistorySync.
var SendTxHistSyncChans = struct {
add, done, remove chan btcwire.ShaHash
access chan SendTxHistSyncRequest
}{
add: make(chan btcwire.ShaHash),
remove: make(chan btcwire.ShaHash),
done: make(chan btcwire.ShaHash),
access: make(chan SendTxHistSyncRequest),
}
// SendTxHistSyncRequest requests a SendTxHistSyncResponse from
// SendBeforeReceiveHistorySync.
type SendTxHistSyncRequest struct {
txid btcwire.ShaHash
response chan SendTxHistSyncResponse
}
// SendTxHistSyncResponse is the response
type SendTxHistSyncResponse struct {
c chan struct{}
ok bool
}
// SendBeforeReceiveHistorySync manages a set of transaction hashes
// created by this wallet. For each newly added txid, a channel is
// created. Once the send history has been recorded, the txid should
// be messaged across done, causing the internal channel to be closed.
// Before receive history is recorded, access should be used to check
// if there are or were any goroutines writing send history, and if
// so, wait until the channel is closed after a done message.
func SendBeforeReceiveHistorySync(add, done, remove chan btcwire.ShaHash,
access chan SendTxHistSyncRequest) {
m := make(map[btcwire.ShaHash]chan struct{})
for {
select {
case txid := <-add:
m[txid] = make(chan struct{})
case txid := <-remove:
delete(m, txid)
case txid := <-done:
if c, ok := m[txid]; ok {
close(c)
}
case req := <-access:
c, ok := m[req.txid]
req.response <- SendTxHistSyncResponse{c: c, ok: ok}
}
}
}
func handleSendRawTxReply(frontend chan []byte, icmd btcjson.Cmd,
result interface{}, e *btcjson.Error, a *Account,
txInfo *CreatedTx) bool {
@ -770,6 +830,7 @@ func handleSendRawTxReply(frontend chan []byte, icmd btcjson.Cmd,
if e != nil {
log.Errorf("Could not send tx: %v", e.Message)
ReplyError(frontend, icmd.Id(), e)
SendTxHistSyncChans.remove <- txInfo.txid
return true
}
@ -780,6 +841,7 @@ func handleSendRawTxReply(frontend chan []byte, icmd btcjson.Cmd,
Message: "Unexpected type from btcd reply",
}
ReplyError(frontend, icmd.Id(), e)
SendTxHistSyncChans.remove <- txInfo.txid
return true
}
txID, err := btcwire.NewShaHashFromStr(txIDStr)
@ -789,6 +851,7 @@ func handleSendRawTxReply(frontend chan []byte, icmd btcjson.Cmd,
Message: "Invalid hash string from btcd reply",
}
ReplyError(frontend, icmd.Id(), e)
SendTxHistSyncChans.remove <- txInfo.txid
return true
}
@ -814,17 +877,13 @@ func handleSendRawTxReply(frontend chan []byte, icmd btcjson.Cmd,
}
}
// Signal that received notifiations are ok to add now.
SendTxHistSyncChans.done <- txInfo.txid
// Remove previous unspent outputs now spent by the tx.
a.UtxoStore.Lock()
modified := a.UtxoStore.s.Remove(txInfo.inputs)
a.UtxoStore.dirty = a.UtxoStore.dirty || modified
// Add unconfirmed change utxo (if any) to UtxoStore.
if txInfo.changeUtxo != nil {
a.UtxoStore.s = append(a.UtxoStore.s, txInfo.changeUtxo)
a.ReqSpentUtxoNtfn(txInfo.changeUtxo)
a.UtxoStore.dirty = true
}
a.UtxoStore.Unlock()
// Disk sync tx and utxo stores.
@ -1111,3 +1170,140 @@ func NotifyNewTxDetails(frontend chan []byte, account string,
mntfn, _ := ntfn.MarshalJSON()
frontend <- mntfn
}
// NotifiedRecvTxRequest is used to check whether the outpoint of
// a received transaction has already been notified due to
// arriving first in the btcd mempool.
type NotifiedRecvTxRequest struct {
op btcwire.OutPoint
response chan NotifiedRecvTxResponse
}
// NotifiedRecvTxResponse is the response of a NotifiedRecvTxRequest
// request.
type NotifiedRecvTxResponse bool
// NotifiedRecvTxChans holds the channels to manage
// StoreNotifiedMempoolTxs.
var NotifiedRecvTxChans = struct {
add, remove chan btcwire.OutPoint
access chan NotifiedRecvTxRequest
}{
add: make(chan btcwire.OutPoint),
remove: make(chan btcwire.OutPoint),
access: make(chan NotifiedRecvTxRequest),
}
// StoreNotifiedMempoolRecvTxs maintains a set of previously-sent
// received transaction notifications originating from the btcd
// mempool. This is used to prevent duplicate frontend transaction
// notifications once a mempool tx is mined into a block.
func StoreNotifiedMempoolRecvTxs(add, remove chan btcwire.OutPoint,
access chan NotifiedRecvTxRequest) {
m := make(map[btcwire.OutPoint]struct{})
for {
select {
case op := <-add:
m[op] = struct{}{}
case op := <-remove:
if _, ok := m[op]; ok {
delete(m, op)
}
case req := <-access:
_, ok := m[req.op]
req.response <- NotifiedRecvTxResponse(ok)
}
}
}
// Channel to send received transactions that were previously
// notified to frontends by the mempool. A TxMined notification
// is sent to all connected frontends detailing the block information
// about the now confirmed transaction.
var NotifyMinedTx = make(chan *tx.RecvTx)
// NotifyMinedTxSender reads received transactions from in, notifying
// frontends that the tx has now been confirmed in a block. Duplicates
// are filtered out.
func NotifyMinedTxSender(in chan *tx.RecvTx) {
// Create a map to hold a set of already notified
// txids. Do not send duplicates.
m := make(map[btcwire.ShaHash]struct{})
for recv := range in {
if _, ok := m[recv.TxID]; !ok {
ntfn := btcws.NewTxMinedNtfn(recv.TxID.String(),
recv.BlockHash.String(), recv.BlockHeight,
recv.BlockTime, int(recv.BlockIndex))
mntfn, _ := ntfn.MarshalJSON()
frontendNotificationMaster <- mntfn
// Mark as sent.
m[recv.TxID] = struct{}{}
}
}
}
// NotifyBalanceSyncerChans holds channels for accessing
// the NotifyBalanceSyncer goroutine.
var NotifyBalanceSyncerChans = struct {
add chan NotifyBalanceWorker
remove chan btcwire.ShaHash
access chan NotifyBalanceRequest
}{
add: make(chan NotifyBalanceWorker),
remove: make(chan btcwire.ShaHash),
access: make(chan NotifyBalanceRequest),
}
// NotifyBalanceWorker holds a block hash to add a worker to
// NotifyBalanceSyncer and uses a chan to returns the WaitGroup
// which should be decremented with Done after the worker is finished.
type NotifyBalanceWorker struct {
block btcwire.ShaHash
wg chan *sync.WaitGroup
}
// NotifyBalanceRequest is used by the blockconnected notification handler
// to access and wait on the the WaitGroup for workers currently processing
// transactions for a block. If no handlers have been added, a nil
// WaitGroup is returned.
type NotifyBalanceRequest struct {
block btcwire.ShaHash
wg chan *sync.WaitGroup
}
// NotifyBalanceSyncer maintains a map of block hashes to WaitGroups
// for worker goroutines that must finish before it is safe to notify
// frontends of a new balance in the blockconnected notification handler.
func NotifyBalanceSyncer(add chan NotifyBalanceWorker,
remove chan btcwire.ShaHash,
access chan NotifyBalanceRequest) {
m := make(map[btcwire.ShaHash]*sync.WaitGroup)
for {
select {
case worker := <-add:
wg, ok := m[worker.block]
if !ok {
wg = &sync.WaitGroup{}
m[worker.block] = wg
}
wg.Add(1)
m[worker.block] = wg
worker.wg <- wg
case block := <-remove:
if _, ok := m[block]; ok {
delete(m, block)
}
case req := <-access:
req.wg <- m[req.block]
}
}
}

View file

@ -65,6 +65,7 @@ var TxFeeIncrement = struct {
// for change (if any).
type CreatedTx struct {
rawTx []byte
txid btcwire.ShaHash
time time.Time
inputs []*tx.Utxo
outputs []tx.Pair
@ -326,6 +327,7 @@ func (a *Account) txToPairs(pairs map[string]int64, minconf int) (*CreatedTx, er
out := tx.Pair{
Amount: int64(change),
PubkeyHash: changeAddrHash,
Change: true,
}
outputs = append(outputs, out)
@ -357,10 +359,16 @@ func (a *Account) txToPairs(pairs map[string]int64, minconf int) (*CreatedTx, er
}
}
txid, err := msgtx.TxSha()
if err != nil {
return nil, fmt.Errorf("cannot create txid for created tx: %v", err)
}
buf := new(bytes.Buffer)
msgtx.BtcEncode(buf, btcwire.ProtocolVersion)
info := &CreatedTx{
rawTx: buf.Bytes(),
txid: txid,
time: time.Now(),
inputs: selectedInputs,
outputs: outputs,

View file

@ -639,8 +639,16 @@ func NtfnBlockConnected(n btcjson.Cmd, marshaled []byte) {
// btcd notifies btcwallet about transactions first, and then sends
// the new block notification. New balance notifications for txs
// in blocks are therefore sent here after all tx notifications
// have arrived.
// have arrived and finished being processed by the handlers.
workers := NotifyBalanceRequest{
block: *hash,
wg: make(chan *sync.WaitGroup),
}
NotifyBalanceSyncerChans.access <- workers
if wg := <-workers.wg; wg != nil {
wg.Wait()
NotifyBalanceSyncerChans.remove <- *hash
}
accountstore.BlockNotify(bs)
// Pass notification to frontends too.

232
tx/tx.go
View file

@ -43,10 +43,32 @@ const (
sendTxHeader
)
// File format versions.
const (
utxoFileVersion uint32 = 0
txFileVersion uint32 = 0
// ReaderFromVersion is an io.ReaderFrom and io.WriterTo that
// can specify any particular wallet file format for reading
// depending on the wallet file version.
type ReaderFromVersion interface {
ReadFromVersion(uint32, io.Reader) (int64, error)
io.WriterTo
}
// Various versions.
var (
// First file version used.
utxoVersFirst uint32 = 0
txVersFirst uint32 = 0
// txVersRecvTxIndex is the version where the txout index
// was added to the RecvTx struct.
txVersRecvTxIndex uint32 = 1
// txVersMarkSentChange is the version where serialized SentTx
// added a flags field, used for marking a sent transaction
// as change.
txVersMarkSentChange uint32 = 2
// Current versions
utxoVersCurrent = utxoVersFirst
txVersCurrent = txVersRecvTxIndex
)
// UtxoStore is a type used for holding all Utxo structures for all
@ -159,6 +181,7 @@ func (p *pubkeyHash) WriteTo(w io.Writer) (int64, error) {
// received by an address in a wallet.
type RecvTx struct {
TxID btcwire.ShaHash
TxOutIdx uint32
TimeReceived int64
BlockHeight int32
BlockHash btcwire.ShaHash
@ -178,8 +201,7 @@ var pairsVar = Pairs([]Pair{})
var _ io.ReaderFrom = &pairsVar
var _ io.WriterTo = &pairsVar
// ReadFrom reades a Pair slice from r. Part of the io.ReaderFrom interface.
func (p *Pairs) ReadFrom(r io.Reader) (int64, error) {
func (p *Pairs) ReadFromVersion(vers uint32, r io.Reader) (int64, error) {
var read int64
nPairsBytes := make([]byte, 4) // Raw bytes for a uint32.
@ -192,7 +214,7 @@ func (p *Pairs) ReadFrom(r io.Reader) (int64, error) {
s := make([]Pair, nPairs)
for i := range s {
n, err := s[i].ReadFrom(r)
n, err := s[i].ReadFromVersion(vers, r)
if err != nil {
return read + n, err
}
@ -203,6 +225,10 @@ func (p *Pairs) ReadFrom(r io.Reader) (int64, error) {
return read, nil
}
func (p *Pairs) ReadFrom(r io.Reader) (int64, error) {
return p.ReadFromVersion(txVersCurrent, r)
}
// WriteTo writes a Pair slice to w. Part of the io.WriterTo interface.
func (p *Pairs) WriteTo(w io.Writer) (int64, error) {
var written int64
@ -234,6 +260,7 @@ func (p *Pairs) WriteTo(w io.Writer) (int64, error) {
type Pair struct {
PubkeyHash pubkeyHash
Amount int64 // Measured in Satoshis
Change bool
}
// Enforce that Pair satisifies the io.ReaderFrom and io.WriterTo
@ -241,6 +268,32 @@ type Pair struct {
var _ io.ReaderFrom = &Pair{}
var _ io.WriterTo = &Pair{}
func (p *Pair) ReadFromVersion(vers uint32, r io.Reader) (int64, error) {
if vers >= txVersMarkSentChange {
// Use latest version
return p.ReadFrom(r)
}
// Old version did not read flags.
var read int64
n, err := p.PubkeyHash.ReadFrom(r)
if err != nil {
return n, err
}
read += n
amountBytes := make([]byte, 8) // raw bytes for a uint64
nr, err := r.Read(amountBytes)
if err != nil {
return read + int64(nr), err
}
read += int64(nr)
p.Amount = int64(binary.LittleEndian.Uint64(amountBytes))
return read, nil
}
// ReadFrom reads a serialized Pair from r. Part of the io.ReaderFrom
// interface.
func (p *Pair) ReadFrom(r io.Reader) (int64, error) {
@ -260,6 +313,15 @@ func (p *Pair) ReadFrom(r io.Reader) (int64, error) {
read += int64(nr)
p.Amount = int64(binary.LittleEndian.Uint64(amountBytes))
// Read flags.
flags := make([]byte, 1) // raw bytes for 1 byte of flags
nr, err = r.Read(flags)
if err != nil {
return read + int64(nr), err
}
read += int64(nr)
p.Change = flags[0]&1<<0 == 1<<0
return read, nil
}
@ -363,7 +425,7 @@ func (u *UtxoStore) WriteTo(w io.Writer) (int64, error) {
// Write file version. This is currently not used.
versionBytes := make([]byte, 4) // bytes for a uint32
binary.LittleEndian.PutUint32(versionBytes, utxoFileVersion)
binary.LittleEndian.PutUint32(versionBytes, utxoVersCurrent)
n, err := w.Write(versionBytes)
if err != nil {
return int64(n), err
@ -383,6 +445,30 @@ func (u *UtxoStore) WriteTo(w io.Writer) (int64, error) {
return written, nil
}
// Insert inserts an Utxo into the store.
func (u *UtxoStore) Insert(utxo *Utxo) {
s := *u
defer func() {
*u = s
}()
// First, iterate through all stored utxos. If an unconfirmed utxo
// (not present in a block) has the same outpoint as this utxo,
// update the block height and hash.
for i := range s {
if bytes.Equal(s[i].Out.Hash[:], utxo.Out.Hash[:]) && s[i].Out.Index == utxo.Out.Index {
// Fill relevant block information.
copy(s[i].BlockHash[:], utxo.BlockHash[:])
s[i].Height = utxo.Height
return
}
}
// After iterating through all UTXOs, it was not a duplicate or
// change UTXO appearing in a block. Append a new Utxo to the end.
s = append(s, utxo)
}
// Rollback removes all utxos from and after the block specified
// by a block height and hash.
//
@ -619,6 +705,7 @@ func (txs *TxStore) ReadFrom(r io.Reader) (int64, error) {
if err != nil {
return int64(n), err
}
vers := binary.LittleEndian.Uint32(versionBytes)
read += int64(n)
store := []interface{}{}
@ -639,24 +726,30 @@ func (txs *TxStore) ReadFrom(r io.Reader) (int64, error) {
read += n
var tx io.ReaderFrom
// Read tx.
switch header {
case recvTxHeader:
tx = new(RecvTx)
t := new(RecvTx)
n, err = t.ReadFromVersion(vers, r)
if err != nil {
return read + n, err
}
read += n
tx = t
case sendTxHeader:
tx = new(SendTx)
t := new(SendTx)
n, err = t.ReadFromVersion(vers, r)
if err != nil {
return read + n, err
}
read += n
tx = t
default:
return n, fmt.Errorf("unknown Tx header")
}
// Read tx
n, err = tx.ReadFrom(r)
if err != nil {
return read + n, err
}
read += n
store = append(store, tx)
}
}
@ -671,7 +764,7 @@ func (txs *TxStore) WriteTo(w io.Writer) (int64, error) {
// Write file version. This is currently not used.
versionBytes := make([]byte, 4) // bytes for a uint32
binary.LittleEndian.PutUint32(versionBytes, utxoFileVersion)
binary.LittleEndian.PutUint32(versionBytes, utxoVersCurrent)
n, err := w.Write(versionBytes)
if err != nil {
return int64(n), err
@ -708,6 +801,41 @@ func (txs *TxStore) WriteTo(w io.Writer) (int64, error) {
return written, nil
}
// InsertRecvTx inserts a RecvTx, checking for duplicates, and updating
// previous entries with the latest block information in tx.
func (txs *TxStore) InsertRecvTx(tx *RecvTx) {
s := *txs
defer func() {
*txs = s
}()
// First, iterate through all stored tx history. If a received tx
// matches the one being added (equal txid and txout idx), update
// it with the new block information.
for i := range s {
recvTx, ok := s[i].(*RecvTx)
if !ok {
// Can only check for equality if the types match.
continue
}
// Found an identical received tx.
if bytes.Equal(recvTx.TxID[:], tx.TxID[:]) &&
recvTx.TxOutIdx == tx.TxOutIdx {
// Fill relevant block information.
copy(recvTx.BlockHash[:], tx.BlockHash[:])
recvTx.BlockHeight = tx.BlockHeight
recvTx.BlockIndex = tx.BlockIndex
recvTx.BlockTime = tx.BlockTime
return
}
}
// No received tx entries with the same outpoint. Append to the end.
s = append(s, tx)
}
// Rollback removes all txs from and after the block specified by a
// block height and hash.
//
@ -755,10 +883,47 @@ func (txs *TxStore) Rollback(height int32, hash *btcwire.ShaHash) (modified bool
return
}
func (tx *RecvTx) ReadFromVersion(vers uint32, r io.Reader) (n int64, err error) {
if vers >= txVersCurrent {
// Use current version.
return tx.ReadFrom(r)
}
// Old file version did not save the txout index.
datas := []interface{}{
&tx.TxID,
// tx index not read.
&tx.TimeReceived,
&tx.BlockHeight,
&tx.BlockHash,
&tx.BlockIndex,
&tx.BlockTime,
&tx.Amount,
&tx.ReceiverHash,
}
var read int64
for _, data := range datas {
switch e := data.(type) {
case io.ReaderFrom:
read, err = e.ReadFrom(r)
default:
read, err = binaryRead(r, binary.LittleEndian, data)
}
if err != nil {
return n + read, err
}
n += read
}
return n, nil
}
// ReadFrom satisifies the io.ReaderFrom interface. A RecTx is read
// in from r with the format:
//
// TxID (32 bytes)
// TxOutIdx (4 bytes, little endian)
// TimeReceived (8 bytes, little endian)
// BlockHeight (4 bytes, little endian)
// BlockHash (32 bytes)
@ -769,6 +934,7 @@ func (txs *TxStore) Rollback(height int32, hash *btcwire.ShaHash) (modified bool
func (tx *RecvTx) ReadFrom(r io.Reader) (n int64, err error) {
datas := []interface{}{
&tx.TxID,
&tx.TxOutIdx,
&tx.TimeReceived,
&tx.BlockHeight,
&tx.BlockHash,
@ -857,23 +1023,14 @@ func (tx *RecvTx) TxInfo(account string, curheight int32,
txInfo["blockindex"] = tx.BlockIndex
txInfo["blocktime"] = tx.BlockTime
txInfo["confirmations"] = curheight - tx.BlockHeight + 1
} else {
txInfo["confirmations"] = 0
}
return txInfo
}
// ReadFrom satisifies the io.WriterTo interface. A SendTx is read
// from r with the format:
//
// TxID (32 bytes)
// Time (8 bytes, little endian)
// BlockHeight (4 bytes, little endian)
// BlockHash (32 bytes)
// BlockIndex (4 bytes, little endian)
// BlockTime (8 bytes, little endian)
// Fee (8 bytes, little endian)
// Receivers (varies)
func (tx *SendTx) ReadFrom(r io.Reader) (n int64, err error) {
func (tx *SendTx) ReadFromVersion(vers uint32, r io.Reader) (n int64, err error) {
var read int64
datas := []interface{}{
@ -903,6 +1060,21 @@ func (tx *SendTx) ReadFrom(r io.Reader) (n int64, err error) {
return n, nil
}
// ReadFrom satisifies the io.WriterTo interface. A SendTx is read
// from r with the format:
//
// TxID (32 bytes)
// Time (8 bytes, little endian)
// BlockHeight (4 bytes, little endian)
// BlockHash (32 bytes)
// BlockIndex (4 bytes, little endian)
// BlockTime (8 bytes, little endian)
// Fee (8 bytes, little endian)
// Receivers (varies)
func (tx *SendTx) ReadFrom(r io.Reader) (n int64, err error) {
return tx.ReadFromVersion(txVersCurrent, r)
}
// WriteTo satisifies the io.WriterTo interface. A SendTx is written to
// w in the format:
//