From 41d758ef5c7af6563e628bb1b3e7fd9ed7ff0e1d Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Wed, 1 Jul 2020 13:23:44 -0400 Subject: [PATCH] test wallet server connection --- wallet/network.go | 3 +++ wallet/transport.go | 57 ++++++++++++++++++++++++++++++++++++--------- 2 files changed, 49 insertions(+), 11 deletions(-) diff --git a/wallet/network.go b/wallet/network.go index 7ff05af..20e86ba 100644 --- a/wallet/network.go +++ b/wallet/network.go @@ -76,6 +76,9 @@ func (n *Node) Connect(addrs []string, config *tls.Config) error { if err == nil { break } + if errors.Is(err, ErrTimeout) { + continue + } if e, ok := err.(*net.OpError); ok && e.Err.Error() == "no such host" { // net.errNoSuchHost is not exported, so we have to string-match continue diff --git a/wallet/transport.go b/wallet/transport.go index 60169a5..83a0dab 100644 --- a/wallet/transport.go +++ b/wallet/transport.go @@ -5,9 +5,12 @@ package wallet import ( "bufio" "crypto/tls" + "encoding/json" + "fmt" "net" "time" + "github.com/lbryio/lbry.go/extras/errors" "github.com/lbryio/lbry.go/v2/extras/stop" log "github.com/sirupsen/logrus" @@ -50,15 +53,31 @@ func NewTransport(addr string, config *tls.Config) (*TCPTransport, error) { t.grp.Add(1) go func() { - t.grp.Done() + defer t.grp.Done() t.listen() }() + err = t.test() + if err != nil { + t.grp.StopAndWait() + return nil, errors.Prefix(addr, err) + } + return t, nil } const delimiter = byte('\n') +func (t *TCPTransport) Send(body []byte) error { + log.Debugf("%s <- %s", t.conn.RemoteAddr(), body) + _, err := t.conn.Write(body) + return err +} + +func (t *TCPTransport) Responses() <-chan []byte { return t.responses } +func (t *TCPTransport) Errors() <-chan error { return t.errors } +func (t *TCPTransport) Shutdown() { t.grp.StopAndWait() } + func (t *TCPTransport) listen() { reader := bufio.NewReader(t.conn) for { @@ -74,12 +93,6 @@ func (t *TCPTransport) listen() { } } -func (t *TCPTransport) Send(body []byte) error { - log.Debugf("%s <- %s", t.conn.RemoteAddr(), body) - _, err := t.conn.Write(body) - return err -} - func (t *TCPTransport) error(err error) { select { case t.errors <- err: @@ -87,11 +100,33 @@ func (t *TCPTransport) error(err error) { } } -func (t *TCPTransport) Responses() <-chan []byte { return t.responses } -func (t *TCPTransport) Errors() <-chan error { return t.errors } +func (t *TCPTransport) test() error { + err := t.Send([]byte(`{"id":1,"method":"server.version"}` + "\n")) + if err != nil { + return errors.Err(err) + } -func (t *TCPTransport) Shutdown() { - t.grp.StopAndWait() + var data []byte + select { + case data = <-t.Responses(): + case <-time.Tick(1 * time.Second): + return errors.Err(ErrTimeout) + } + + var response struct { + Error struct { + Message string `json:"message"` + } `json:"error"` + } + + err = json.Unmarshal(data, &response) + if err != nil { + return errors.Err(err) + } + if response.Error.Message != "" { + return fmt.Errorf(response.Error.Message) + } + return nil } func (t *TCPTransport) close() {