From df4f42db82136299fe6a6926def71393c68af2ee Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Wed, 1 Jul 2020 14:42:23 -0400 Subject: [PATCH] successfully shut down wallet server --- peer/http3/server.go | 4 +-- reflector/blocklist.go | 58 ++++++++++++++++++++++++++++++------------ wallet/network.go | 14 ++++++++++ 3 files changed, 58 insertions(+), 18 deletions(-) diff --git a/peer/http3/server.go b/peer/http3/server.go index c9bab5e..6f18db3 100644 --- a/peer/http3/server.go +++ b/peer/http3/server.go @@ -40,9 +40,9 @@ func NewServer(store store.BlobStore) *Server { // Shutdown gracefully shuts down the peer server. func (s *Server) Shutdown() { - log.Debug("shutting down peer server") + log.Debug("shutting down http3 peer server") s.grp.StopAndWait() - log.Debug("peer server stopped") + log.Debug("http3 peer server stopped") } func (s *Server) logError(e error) { diff --git a/reflector/blocklist.go b/reflector/blocklist.go index ed2ba9c..9abfc4a 100644 --- a/reflector/blocklist.go +++ b/reflector/blocklist.go @@ -12,6 +12,7 @@ import ( "github.com/lbryio/reflector.go/wallet" "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/extras/stop" log "github.com/sirupsen/logrus" ) @@ -19,21 +20,28 @@ import ( const blocklistURL = "https://api.lbry.com/file/list_blocked" func (s *Server) enableBlocklist(b store.Blocklister) { - // TODO: updateBlocklist should be killed when server is shutting down - updateBlocklist(b) + walletServers := []string{ + "spv25.lbry.com:50001", + "spv26.lbry.com:50001", + "spv19.lbry.com:50001", + "spv14.lbry.com:50001", + } + + updateBlocklist(b, walletServers, s.grp.Ch()) t := time.NewTicker(12 * time.Hour) for { select { case <-s.grp.Ch(): return case <-t.C: - updateBlocklist(b) + updateBlocklist(b, walletServers, s.grp.Ch()) } } } -func updateBlocklist(b store.Blocklister) { - values, err := blockedSdHashes() +func updateBlocklist(b store.Blocklister, walletServers []string, stopper stop.Chan) { + log.Debugf("blocklist update starting") + values, err := blockedSdHashes(walletServers, stopper) if err != nil { log.Error(err) return @@ -50,10 +58,12 @@ func updateBlocklist(b store.Blocklister) { log.Error(err) } } + log.Debugf("blocklist update done") } -func blockedSdHashes() (map[string]valOrErr, error) { - resp, err := http.Get(blocklistURL) +func blockedSdHashes(walletServers []string, stopper stop.Chan) (map[string]valOrErr, error) { + client := http.Client{Timeout: 1 * time.Second} + resp, err := client.Get(blocklistURL) if err != nil { return nil, errors.Err(err) } @@ -80,7 +90,7 @@ func blockedSdHashes() (map[string]valOrErr, error) { return nil, errors.Prefix("list_blocked API call", r.Error) } - return sdHashesForOutpoints(r.Data.Outpoints) + return sdHashesForOutpoints(walletServers, r.Data.Outpoints, stopper) } type valOrErr struct { @@ -89,22 +99,33 @@ type valOrErr struct { } // sdHashesForOutpoints queries wallet server for the sd hashes in a given outpoints -func sdHashesForOutpoints(outpoints []string) (map[string]valOrErr, error) { +func sdHashesForOutpoints(walletServers, outpoints []string, stopper stop.Chan) (map[string]valOrErr, error) { values := make(map[string]valOrErr) node := wallet.NewNode() - defer node.Shutdown() - err := node.Connect([]string{ - "spv25.lbry.com:50001", - "spv26.lbry.com:50001", - "spv19.lbry.com:50001", - "spv14.lbry.com:50001", - }, nil) + err := node.Connect(walletServers, nil) if err != nil { return nil, errors.Err(err) } + done := make(chan bool) + + go func() { + select { + case <-done: + case <-stopper: + } + node.Shutdown() + }() + +OutpointLoop: for _, outpoint := range outpoints { + select { + case <-stopper: + break OutpointLoop + default: + } + parts := strings.Split(outpoint, ":") if len(parts) != 2 { values[outpoint] = valOrErr{Err: errors.Err("invalid outpoint format")} @@ -127,5 +148,10 @@ func sdHashesForOutpoints(outpoints []string) (map[string]valOrErr, error) { values[outpoint] = valOrErr{Value: hash, Err: nil} } + select { + case done <- true: + default: // in case of race where stopper got stopped right after loop finished + } + return values, nil } diff --git a/wallet/network.go b/wallet/network.go index 20e86ba..10e4edf 100644 --- a/wallet/network.go +++ b/wallet/network.go @@ -115,7 +115,13 @@ func (n *Node) Connect(addrs []string, config *tls.Config) error { } func (n *Node) Shutdown() { + var addr net.Addr + if n.transport != nil { + addr = n.transport.conn.RemoteAddr() + } + log.Debugf("shutting down wallet %s", addr) n.grp.StopAndWait() + log.Debugf("wallet stopped") } func (n *Node) handleErrors() { @@ -138,6 +144,12 @@ func (n *Node) err(err error) { // listen processes messages from the server. func (n *Node) listen() { for { + select { + case <-n.grp.Ch(): + return + default: + } + select { case <-n.grp.Ch(): return @@ -248,6 +260,8 @@ func (n *Node) request(method string, params []string, v interface{}) error { var r response select { + case <-n.grp.Ch(): + return nil case r = <-c: case <-time.After(n.timeout): r = response{err: errors.Err(ErrTimeout)}