Use btcws for parsing btcd notifications.

This commit is contained in:
Josh Rickmar 2013-11-08 12:45:18 -05:00
parent cdd9bea5db
commit 30db3490c0

View file

@ -254,6 +254,14 @@ func BtcdHandler(ws *websocket.Conn) {
}
}
type notificationHandler func(btcws.Notification)
var notificationHandlers = map[string]notificationHandler{
btcws.BlockConnectedNtfnId: NtfnBlockConnected,
btcws.BlockDisconnectedNtfnId: NtfnBlockDisconnected,
btcws.TxMinedNtfnId: NtfnTxMined,
}
// ProcessBtcdNotificationReply unmarshalls the JSON notification or
// reply received from btcd and decides how to handle it. Replies are
// routed back to the frontend who sent the message, and wallet
@ -330,18 +338,20 @@ func ProcessBtcdNotificationReply(b []byte) {
}
c <- b
} else {
// btcd notification must either be handled by btcwallet or sent
// to all frontends if btcwallet can not handle it.
switch idStr {
case "btcd:blockconnected":
NtfnBlockConnected(r.Result)
case "btcd:blockdisconnected":
NtfnBlockDisconnected(r.Result)
default:
frontendNotificationMaster <- b
// Message is a btcd notification. Check the id and dispatch
// correct handler, or if no handler, pass up to each wallet.
if ntfnHandler, ok := notificationHandlers[idStr]; ok {
n, err := btcws.ParseMarshaledNtfn(idStr, b)
if err != nil {
log.Errorf("Error unmarshaling expected "+
"notification: %v", err)
return
}
ntfnHandler(n)
return
}
frontendNotificationMaster <- b
}
}
@ -359,56 +369,29 @@ func NotifyNewBlockChainHeight(reply chan []byte, height int32) {
// NtfnBlockConnected handles btcd notifications resulting from newly
// connected blocks to the main blockchain.
func NtfnBlockConnected(r interface{}) {
result, ok := r.(map[string]interface{})
func NtfnBlockConnected(n btcws.Notification) {
bcn, ok := n.(*btcws.BlockConnectedNtfn)
if !ok {
log.Error("blockconnected notification: invalid result")
log.Errorf("%v handler: unexpected type", n.Id())
return
}
hashBE, ok := result["hash"].(string)
if !ok {
log.Error("blockconnected notification: invalid hash")
return
}
hash, err := btcwire.NewShaHashFromStr(hashBE)
hash, err := btcwire.NewShaHashFromStr(bcn.Hash)
if err != nil {
log.Error("btcd:blockconnected handler: invalid hash string")
log.Errorf("%v handler: invalid hash string", n.Id())
return
}
heightf, ok := result["height"].(float64)
if !ok {
log.Error("blockconnected notification: invalid height")
return
}
height := int32(heightf)
var minedTxs []TXID
if iminedTxs, ok := result["minedtxs"].([]interface{}); ok {
minedTxs = make([]TXID, len(iminedTxs))
for i, iminedTx := range iminedTxs {
minedTx, ok := iminedTx.(string)
if !ok {
log.Error("blockconnected notification: mined tx is not a string")
continue
}
minedTxs[i] = TXID(minedTx)
}
}
curBlock.Lock()
curBlock.BlockStamp = wallet.BlockStamp{
Height: height,
Hash: *hash,
}
curBlock.Unlock()
// btcd notifies btcwallet about transactions first, and then sends
// the block notification. This prevents any races from saving a
// synced-to block before all notifications from the block have been
// processed.
bs := &wallet.BlockStamp{
Height: height,
Height: bcn.Height,
Hash: *hash,
}
curBlock.Lock()
curBlock.BlockStamp = *bs
curBlock.Unlock()
for _, w := range wallets.m {
// We do not write synced info immediatelly out to disk.
// If btcd is performing an IBD, that would result in
@ -425,36 +408,7 @@ func NtfnBlockConnected(r interface{}) {
}
// Notify frontends of new blockchain height.
NotifyNewBlockChainHeight(frontendNotificationMaster, height)
// Remove all mined transactions from pool.
UnminedTxs.Lock()
for _, txid := range minedTxs {
delete(UnminedTxs.m, txid)
}
UnminedTxs.Unlock()
}
// ResendUnminedTxs resends any transactions in the unmined
// transaction pool to btcd using the 'sendrawtransaction' RPC
// command.
func resendUnminedTxs() {
for _, createdTx := range UnminedTxs.m {
n := <-NewJSONID
var id interface{} = fmt.Sprintf("btcwallet(%v)", n)
m, err := btcjson.CreateMessageWithId("sendrawtransaction", id, string(createdTx.rawTx))
if err != nil {
log.Errorf("cannot create resend request: %v", err)
continue
}
replyHandlers.Lock()
replyHandlers.m[n] = func(result interface{}, err *btcjson.Error) bool {
// Do nothing, just remove the handler.
return true
}
replyHandlers.Unlock()
btcdMsgs <- m
}
NotifyNewBlockChainHeight(frontendNotificationMaster, bcn.Height)
}
// NtfnBlockDisconnected handles btcd notifications resulting from
@ -462,35 +416,45 @@ func resendUnminedTxs() {
// switch and notifies frontends of the new blockchain height.
//
// TODO(jrick): Rollback Utxo and Tx data
func NtfnBlockDisconnected(r interface{}) {
result, ok := r.(map[string]interface{})
func NtfnBlockDisconnected(n btcws.Notification) {
bdn, ok := n.(*btcws.BlockDisconnectedNtfn)
if !ok {
log.Error("blockdisconnected notification: invalid result")
log.Errorf("%v handler: unexpected type", n.Id())
return
}
hashBE, ok := result["hash"].(string)
if !ok {
log.Error("blockdisconnected notification: invalid hash")
return
}
hash, err := btcwire.NewShaHashFromStr(hashBE)
hash, err := btcwire.NewShaHashFromStr(bdn.Hash)
if err != nil {
log.Error("btcd:blockdisconnected handler: invalid hash string")
log.Errorf("%v handler: invalid hash string", n.Id())
return
}
heightf, ok := result["height"].(float64)
if !ok {
log.Error("blockdisconnected notification: invalid height")
}
height := int32(heightf)
// Rollback Utxo and Tx data stores.
go func() {
wallets.Rollback(height, hash)
wallets.Rollback(bdn.Height, hash)
}()
// Notify frontends of new blockchain height.
NotifyNewBlockChainHeight(frontendNotificationMaster, height)
NotifyNewBlockChainHeight(frontendNotificationMaster, bdn.Height)
}
// NtfnTxMined handles btcd notifications resulting from newly
// mined transactions that originated from this wallet.
func NtfnTxMined(n btcws.Notification) {
tmn, ok := n.(*btcws.TxMinedNtfn)
if !ok {
log.Errorf("%v handler: unexpected type", n.Id())
return
}
hash, err := btcwire.NewShaHashFromStr(tmn.Hash)
if err != nil {
log.Errorf("%v handler: invalid hash string", n.Id())
return
}
// Remove mined transaction from pool.
UnminedTxs.Lock()
delete(UnminedTxs.m, TXID(hash[:]))
UnminedTxs.Unlock()
}
var duplicateOnce sync.Once
@ -555,6 +519,28 @@ func BtcdConnect(reply chan error) {
reply <- ErrConnLost
}
// resendUnminedTxs resends any transactions in the unmined
// transaction pool to btcd using the 'sendrawtransaction' RPC
// command.
func resendUnminedTxs() {
for _, createdTx := range UnminedTxs.m {
n := <-NewJSONID
var id interface{} = fmt.Sprintf("btcwallet(%v)", n)
m, err := btcjson.CreateMessageWithId("sendrawtransaction", id, string(createdTx.rawTx))
if err != nil {
log.Errorf("cannot create resend request: %v", err)
continue
}
replyHandlers.Lock()
replyHandlers.m[n] = func(result interface{}, err *btcjson.Error) bool {
// Do nothing, just remove the handler.
return true
}
replyHandlers.Unlock()
btcdMsgs <- m
}
}
// BtcdHandshake first checks that the websocket connection between
// btcwallet and btcd is valid, that is, that there are no mismatching
// settings between the two processes (such as running on different