Use a single handler (per wallet) for all tx notifications.

This commit is contained in:
Josh Rickmar 2013-09-05 11:19:48 -04:00
parent 897fa1448b
commit 019df772b1
2 changed files with 132 additions and 112 deletions

242
cmd.go
View file

@ -79,9 +79,10 @@ func main() {
type BtcWallet struct { type BtcWallet struct {
*wallet.Wallet *wallet.Wallet
mtx sync.RWMutex mtx sync.RWMutex
dirty bool dirty bool
UtxoStore struct { NewBlockTxSeqN uint64
UtxoStore struct {
sync.RWMutex sync.RWMutex
dirty bool dirty bool
s tx.UtxoStore s tx.UtxoStore
@ -183,13 +184,35 @@ func OpenWallet(cfg *config, account string) (*BtcWallet, error) {
} }
func (w *BtcWallet) Track() { func (w *BtcWallet) Track() {
wallets.Lock() seq.Lock()
name := w.Name() n := seq.n
if wallets.m[name] == nil { seq.n++
wallets.m[name] = w seq.Unlock()
}
wallets.Unlock()
// Use goroutines and a WaitGroup to prevent unnecessary waiting for
// released locks.
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
wallets.Lock()
name := w.Name()
if wallets.m[name] == nil {
wallets.m[name] = w
}
wallets.Unlock()
}()
go func() {
defer wg.Done()
w.mtx.Lock()
w.NewBlockTxSeqN = n
w.mtx.Unlock()
}()
wg.Wait()
replyHandlers.Lock()
replyHandlers.m[n] = w.NewBlockTxHandler
replyHandlers.Unlock()
for _, addr := range w.GetActiveAddresses() { for _, addr := range w.GetActiveAddresses() {
go w.ReqNewTxsForAddress(addr) go w.ReqNewTxsForAddress(addr)
} }
@ -231,10 +254,9 @@ func (w *BtcWallet) RescanForAddress(addr string, blocks ...int) {
} }
func (w *BtcWallet) ReqNewTxsForAddress(addr string) { func (w *BtcWallet) ReqNewTxsForAddress(addr string) {
seq.Lock() w.mtx.RLock()
n := seq.n n := w.NewBlockTxSeqN
seq.n++ w.mtx.RUnlock()
seq.Unlock()
m := &btcjson.Message{ m := &btcjson.Message{
Jsonrpc: "1.0", Jsonrpc: "1.0",
@ -244,103 +266,101 @@ func (w *BtcWallet) ReqNewTxsForAddress(addr string) {
} }
msg, _ := json.Marshal(m) msg, _ := json.Marshal(m)
replyHandlers.Lock()
replyHandlers.m[n] = func(result interface{}) bool {
// TODO(jrick): btcd also sends the block hash in the reply.
// Do we want it saved as well?
v, ok := result.(map[string]interface{})
if !ok {
log.Error("Tx Handler: Unexpected result type.")
return false
}
sender58, ok := v["sender"].(string)
if !ok {
log.Error("Tx Handler: Unspecified sender.")
return false
}
receiver58, ok := v["receiver"].(string)
if !ok {
log.Error("Tx Handler: Unspecified receiver.")
return false
}
height, ok := v["height"].(float64)
if !ok {
log.Error("Tx Handler: Unspecified height.")
return false
}
txhashBE, ok := v["txhash"].(string)
if !ok {
log.Error("Tx Handler: Unspecified transaction hash.")
return false
}
index, ok := v["index"].(float64)
if !ok {
log.Error("Tx Handler: Unspecified transaction index.")
return false
}
amt, ok := v["amount"].(float64)
if !ok {
log.Error("Tx Handler: Unspecified amount.")
return false
}
spent, ok := v["spent"].(bool)
if !ok {
log.Error("Tx Handler: Unspecified spent field.")
return false
}
// btcd sends the tx hashe as a BE string. Convert to a
// LE ShaHash.
txhash, err := btcwire.NewShaHashFromStr(txhashBE)
if err != nil {
log.Error("Tx Handler: Tx hash string cannot be parsed: " + err.Error())
return false
}
sender := btcutil.Base58Decode(sender58)
receiver := btcutil.Base58Decode(receiver58)
go func() {
t := &tx.RecvTx{
Amt: int64(amt),
}
copy(t.TxHash[:], txhash[:])
copy(t.SenderAddr[:], sender)
copy(t.ReceiverAddr[:], receiver)
w.TxStore.Lock()
txs := w.TxStore.s
w.TxStore.s = append(txs, t)
w.TxStore.dirty = true
w.TxStore.Unlock()
}()
go func() {
// Do not add output to utxo store if spent.
if spent {
return
}
u := &tx.Utxo{
Amt: int64(amt),
Height: int64(height),
}
copy(u.Out.Hash[:], txhash[:])
u.Out.Index = uint32(index)
copy(u.Addr[:], receiver)
w.UtxoStore.Lock()
// All newly saved utxos are first classified as unconfirmed.
utxos := w.UtxoStore.s.Unconfirmed
w.UtxoStore.s.Unconfirmed = append(utxos, u)
w.UtxoStore.dirty = true
w.UtxoStore.Unlock()
}()
// Never remove this handler.
return false
}
replyHandlers.Unlock()
btcdMsgs <- msg btcdMsgs <- msg
} }
func (w *BtcWallet) NewBlockTxHandler(result interface{}) bool {
// TODO(jrick): btcd also sends the block hash in the reply.
// Do we want it saved as well?
v, ok := result.(map[string]interface{})
if !ok {
log.Error("Tx Handler: Unexpected result type.")
return false
}
sender58, ok := v["sender"].(string)
if !ok {
log.Error("Tx Handler: Unspecified sender.")
return false
}
receiver58, ok := v["receiver"].(string)
if !ok {
log.Error("Tx Handler: Unspecified receiver.")
return false
}
height, ok := v["height"].(float64)
if !ok {
log.Error("Tx Handler: Unspecified height.")
return false
}
txhashBE, ok := v["txhash"].(string)
if !ok {
log.Error("Tx Handler: Unspecified transaction hash.")
return false
}
index, ok := v["index"].(float64)
if !ok {
log.Error("Tx Handler: Unspecified transaction index.")
return false
}
amt, ok := v["amount"].(float64)
if !ok {
log.Error("Tx Handler: Unspecified amount.")
return false
}
spent, ok := v["spent"].(bool)
if !ok {
log.Error("Tx Handler: Unspecified spent field.")
return false
}
// btcd sends the tx hash as a BE string. Convert to a
// LE ShaHash.
txhash, err := btcwire.NewShaHashFromStr(txhashBE)
if err != nil {
log.Error("Tx Handler: Tx hash string cannot be parsed: " + err.Error())
return false
}
sender := btcutil.Base58Decode(sender58)
receiver := btcutil.Base58Decode(receiver58)
go func() {
t := &tx.RecvTx{
Amt: int64(amt),
}
copy(t.TxHash[:], txhash[:])
copy(t.SenderAddr[:], sender)
copy(t.ReceiverAddr[:], receiver)
w.TxStore.Lock()
txs := w.TxStore.s
w.TxStore.s = append(txs, t)
w.TxStore.dirty = true
w.TxStore.Unlock()
}()
go func() {
// Do not add output to utxo store if spent.
if spent {
return
}
u := &tx.Utxo{
Amt: int64(amt),
Height: int64(height),
}
copy(u.Out.Hash[:], txhash[:])
u.Out.Index = uint32(index)
copy(u.Addr[:], receiver)
w.UtxoStore.Lock()
// All newly saved utxos are first classified as unconfirmed.
utxos := w.UtxoStore.s.Unconfirmed
w.UtxoStore.s.Unconfirmed = append(utxos, u)
w.UtxoStore.dirty = true
w.UtxoStore.Unlock()
}()
// Never remove this handler.
return false
}

View file

@ -319,7 +319,7 @@ func ListenAndServe() error {
// requests for each channel in the set. // requests for each channel in the set.
go frontendListenerDuplicator() go frontendListenerDuplicator()
// XXX(jrick): We need some sort of authentication before websocket // TODO(jrick): We need some sort of authentication before websocket
// connections are allowed, and perhaps TLS on the server as well. // connections are allowed, and perhaps TLS on the server as well.
http.Handle("/frontend", websocket.Handler(frontendReqsNotifications)) http.Handle("/frontend", websocket.Handler(frontendReqsNotifications))
if err := http.ListenAndServe(fmt.Sprintf(":%d", cfg.SvrPort), nil); err != nil { if err := http.ListenAndServe(fmt.Sprintf(":%d", cfg.SvrPort), nil); err != nil {