reflector.go/wallet/network.go

263 lines
5.2 KiB
Go
Raw Normal View History

2018-09-01 01:50:22 +02:00
package wallet
2018-09-01 01:50:09 +02:00
// copied from https://github.com/d4l3k/go-electrum
import (
"crypto/tls"
"encoding/json"
2018-09-20 17:24:36 +02:00
"math/rand"
"net"
2018-09-01 01:50:09 +02:00
"sync"
2018-09-20 17:24:36 +02:00
"time"
2019-11-14 01:11:35 +01:00
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/stop"
2019-01-16 17:41:58 +01:00
2018-09-20 17:24:36 +02:00
log "github.com/sirupsen/logrus"
2019-12-29 01:24:26 +01:00
"go.uber.org/atomic"
2018-09-01 01:50:09 +02:00
)
const (
ClientVersion = "0.0.1"
ProtocolVersion = "1.0"
)
var (
2019-01-16 17:41:58 +01:00
ErrNotImplemented = errors.Base("not implemented")
ErrNodeConnected = errors.Base("node already connected")
ErrConnectFailed = errors.Base("failed to connect")
ErrTimeout = errors.Base("timeout")
2018-09-01 01:50:09 +02:00
)
2019-01-16 17:45:28 +01:00
type response struct {
2019-01-16 17:41:58 +01:00
data []byte
err error
}
2018-09-01 01:50:09 +02:00
type Node struct {
2018-09-20 17:24:36 +02:00
transport *TCPTransport
nextId atomic.Uint32
grp *stop.Group
2018-09-01 01:50:09 +02:00
2018-09-20 20:24:30 +02:00
handlersMu *sync.RWMutex
2019-01-16 17:45:28 +01:00
handlers map[uint32]chan response
2018-09-01 01:50:09 +02:00
2018-09-20 20:24:30 +02:00
pushHandlersMu *sync.RWMutex
2019-01-16 17:45:28 +01:00
pushHandlers map[string][]chan response
2019-01-16 17:41:58 +01:00
timeout time.Duration
2018-09-01 01:50:09 +02:00
}
// NewNode creates a new node.
func NewNode() *Node {
2018-09-20 17:24:36 +02:00
return &Node{
2019-01-16 17:45:28 +01:00
handlers: make(map[uint32]chan response),
pushHandlers: make(map[string][]chan response),
2018-09-20 20:24:30 +02:00
handlersMu: &sync.RWMutex{},
pushHandlersMu: &sync.RWMutex{},
grp: stop.New(),
2019-01-16 17:41:58 +01:00
timeout: 1 * time.Second,
2018-09-01 01:50:09 +02:00
}
}
2018-09-20 17:24:36 +02:00
// Connect creates a new connection to the specified address.
func (n *Node) Connect(addrs []string, config *tls.Config) error {
2018-09-01 01:50:09 +02:00
if n.transport != nil {
return ErrNodeConnected
}
2018-09-20 17:24:36 +02:00
// shuffle addresses for load balancing
rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
var err error
for _, addr := range addrs {
n.transport, err = NewTransport(addr, config)
if err == nil {
break
}
if e, ok := err.(*net.OpError); ok && e.Err.Error() == "no such host" {
// net.errNoSuchHost is not exported, so we have to string-match
continue
}
2018-09-01 01:50:09 +02:00
return err
}
2018-09-20 17:24:36 +02:00
if n.transport == nil {
return ErrConnectFailed
}
log.Debugf("wallet connected to %s", n.transport.conn.RemoteAddr())
n.grp.Add(1)
go func() {
defer n.grp.Done()
<-n.grp.Ch()
n.transport.Shutdown()
}()
n.grp.Add(1)
go func() {
defer n.grp.Done()
n.handleErrors()
}()
n.grp.Add(1)
go func() {
defer n.grp.Done()
n.listen()
}()
2018-09-01 01:50:09 +02:00
return nil
}
2018-09-20 17:24:36 +02:00
func (n *Node) Shutdown() {
n.grp.StopAndWait()
}
func (n *Node) handleErrors() {
for {
select {
case <-n.grp.Ch():
return
case err := <-n.transport.Errors():
n.err(err)
}
2018-09-01 01:50:09 +02:00
}
}
// err handles errors produced by the foreign node.
func (n *Node) err(err error) {
// TODO: Better error handling.
2018-09-20 17:24:36 +02:00
log.Error(err)
2018-09-01 01:50:09 +02:00
}
// listen processes messages from the server.
func (n *Node) listen() {
for {
select {
2018-09-20 17:24:36 +02:00
case <-n.grp.Ch():
2018-09-01 01:50:09 +02:00
return
case bytes := <-n.transport.Responses():
2019-01-16 17:45:28 +01:00
msg := &struct {
Id uint32 `json:"id"`
Method string `json:"method"`
Error struct {
Code int `json:"code"`
Message string `json:"message"`
} `json:"error"`
}{}
msg2 := &struct {
Id uint32 `json:"id"`
Method string `json:"method"`
Error struct {
Code int `json:"code"`
Message struct {
Code int `json:"code"`
Message string `json:"message"`
} `json:"message"`
} `json:"error"`
}{}
r := response{}
err := json.Unmarshal(bytes, msg)
if err != nil {
// try msg2, a hack around the weird error-in-error response we sometimes get from wallet server
// maybe that happens because the wallet server passes a lbrycrd error through to us?
if err2 := json.Unmarshal(bytes, msg2); err2 == nil {
err = nil
msg.Id = msg2.Id
msg.Method = msg2.Method
msg.Error = msg2.Error.Message
}
2018-09-01 01:50:09 +02:00
}
2019-01-16 17:41:58 +01:00
if err != nil {
n.err(err)
r.err = errors.Err(err)
} else if len(msg.Error.Message) > 0 {
2019-01-16 17:45:28 +01:00
r.err = errors.Base("%d: %s", msg.Error.Code, msg.Error.Message)
2019-01-16 17:41:58 +01:00
} else {
r.data = bytes
2018-09-01 01:50:09 +02:00
}
2019-01-16 17:41:58 +01:00
2018-09-01 01:50:09 +02:00
if len(msg.Method) > 0 {
2018-09-20 17:24:36 +02:00
n.pushHandlersMu.RLock()
2018-09-01 01:50:09 +02:00
handlers := n.pushHandlers[msg.Method]
2018-09-20 17:24:36 +02:00
n.pushHandlersMu.RUnlock()
2018-09-01 01:50:09 +02:00
for _, handler := range handlers {
select {
2019-01-16 17:41:58 +01:00
case handler <- r:
2018-09-01 01:50:09 +02:00
default:
}
}
}
2018-09-20 17:24:36 +02:00
n.handlersMu.RLock()
2018-09-01 01:50:09 +02:00
c, ok := n.handlers[msg.Id]
2018-09-20 17:24:36 +02:00
n.handlersMu.RUnlock()
2018-09-01 01:50:09 +02:00
if ok {
2019-01-16 17:41:58 +01:00
c <- r
2018-09-01 01:50:09 +02:00
}
}
}
}
// listenPush returns a channel of messages matching the method.
2018-09-24 16:31:14 +02:00
//func (n *Node) listenPush(method string) <-chan []byte {
// c := make(chan []byte, 1)
// n.pushHandlersMu.Lock()
// defer n.pushHandlersMu.Unlock()
// n.pushHandlers[method] = append(n.pushHandlers[method], c)
// return c
//}
2018-09-01 01:50:09 +02:00
// request makes a request to the server and unmarshals the response into v.
func (n *Node) request(method string, params []string, v interface{}) error {
2019-01-16 17:45:28 +01:00
msg := struct {
Id uint32 `json:"id"`
Method string `json:"method"`
Params []string `json:"params"`
}{
2018-09-20 17:24:36 +02:00
Id: n.nextId.Load(),
2018-09-01 01:50:09 +02:00
Method: method,
Params: params,
}
2018-09-20 17:24:36 +02:00
n.nextId.Inc()
2018-09-01 01:50:09 +02:00
bytes, err := json.Marshal(msg)
if err != nil {
return err
}
2018-09-20 17:24:36 +02:00
bytes = append(bytes, delimiter)
2019-01-16 17:45:28 +01:00
c := make(chan response, 1)
2018-09-01 01:50:09 +02:00
2018-09-20 17:24:36 +02:00
n.handlersMu.Lock()
2018-09-01 01:50:09 +02:00
n.handlers[msg.Id] = c
2018-09-20 17:24:36 +02:00
n.handlersMu.Unlock()
2018-09-01 01:50:09 +02:00
2019-01-16 17:41:58 +01:00
err = n.transport.Send(bytes)
if err != nil {
return err
}
2019-01-16 17:45:28 +01:00
var r response
2019-01-16 17:41:58 +01:00
select {
case r = <-c:
case <-time.After(n.timeout):
2019-01-16 17:45:28 +01:00
r = response{err: errors.Err(ErrTimeout)}
2019-01-16 17:41:58 +01:00
}
2018-09-01 01:50:09 +02:00
2018-09-20 17:24:36 +02:00
n.handlersMu.Lock()
2018-09-01 01:50:09 +02:00
delete(n.handlers, msg.Id)
2018-09-20 17:24:36 +02:00
n.handlersMu.Unlock()
2018-09-01 01:50:09 +02:00
2019-01-16 17:41:58 +01:00
if r.err != nil {
return r.err
}
return json.Unmarshal(r.data, v)
2018-09-01 01:50:09 +02:00
}