successfully shut down wallet server

This commit is contained in:
Alex Grintsvayg 2020-07-01 14:42:23 -04:00 committed by Niko Storni
parent 41d758ef5c
commit df4f42db82
3 changed files with 58 additions and 18 deletions

View file

@ -40,9 +40,9 @@ func NewServer(store store.BlobStore) *Server {
// Shutdown gracefully shuts down the peer server.
func (s *Server) Shutdown() {
log.Debug("shutting down peer server")
log.Debug("shutting down http3 peer server")
s.grp.StopAndWait()
log.Debug("peer server stopped")
log.Debug("http3 peer server stopped")
}
func (s *Server) logError(e error) {

View file

@ -12,6 +12,7 @@ import (
"github.com/lbryio/reflector.go/wallet"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/stop"
log "github.com/sirupsen/logrus"
)
@ -19,21 +20,28 @@ import (
const blocklistURL = "https://api.lbry.com/file/list_blocked"
func (s *Server) enableBlocklist(b store.Blocklister) {
// TODO: updateBlocklist should be killed when server is shutting down
updateBlocklist(b)
walletServers := []string{
"spv25.lbry.com:50001",
"spv26.lbry.com:50001",
"spv19.lbry.com:50001",
"spv14.lbry.com:50001",
}
updateBlocklist(b, walletServers, s.grp.Ch())
t := time.NewTicker(12 * time.Hour)
for {
select {
case <-s.grp.Ch():
return
case <-t.C:
updateBlocklist(b)
updateBlocklist(b, walletServers, s.grp.Ch())
}
}
}
func updateBlocklist(b store.Blocklister) {
values, err := blockedSdHashes()
func updateBlocklist(b store.Blocklister, walletServers []string, stopper stop.Chan) {
log.Debugf("blocklist update starting")
values, err := blockedSdHashes(walletServers, stopper)
if err != nil {
log.Error(err)
return
@ -50,10 +58,12 @@ func updateBlocklist(b store.Blocklister) {
log.Error(err)
}
}
log.Debugf("blocklist update done")
}
func blockedSdHashes() (map[string]valOrErr, error) {
resp, err := http.Get(blocklistURL)
func blockedSdHashes(walletServers []string, stopper stop.Chan) (map[string]valOrErr, error) {
client := http.Client{Timeout: 1 * time.Second}
resp, err := client.Get(blocklistURL)
if err != nil {
return nil, errors.Err(err)
}
@ -80,7 +90,7 @@ func blockedSdHashes() (map[string]valOrErr, error) {
return nil, errors.Prefix("list_blocked API call", r.Error)
}
return sdHashesForOutpoints(r.Data.Outpoints)
return sdHashesForOutpoints(walletServers, r.Data.Outpoints, stopper)
}
type valOrErr struct {
@ -89,22 +99,33 @@ type valOrErr struct {
}
// sdHashesForOutpoints queries wallet server for the sd hashes in a given outpoints
func sdHashesForOutpoints(outpoints []string) (map[string]valOrErr, error) {
func sdHashesForOutpoints(walletServers, outpoints []string, stopper stop.Chan) (map[string]valOrErr, error) {
values := make(map[string]valOrErr)
node := wallet.NewNode()
defer node.Shutdown()
err := node.Connect([]string{
"spv25.lbry.com:50001",
"spv26.lbry.com:50001",
"spv19.lbry.com:50001",
"spv14.lbry.com:50001",
}, nil)
err := node.Connect(walletServers, nil)
if err != nil {
return nil, errors.Err(err)
}
done := make(chan bool)
go func() {
select {
case <-done:
case <-stopper:
}
node.Shutdown()
}()
OutpointLoop:
for _, outpoint := range outpoints {
select {
case <-stopper:
break OutpointLoop
default:
}
parts := strings.Split(outpoint, ":")
if len(parts) != 2 {
values[outpoint] = valOrErr{Err: errors.Err("invalid outpoint format")}
@ -127,5 +148,10 @@ func sdHashesForOutpoints(outpoints []string) (map[string]valOrErr, error) {
values[outpoint] = valOrErr{Value: hash, Err: nil}
}
select {
case done <- true:
default: // in case of race where stopper got stopped right after loop finished
}
return values, nil
}

View file

@ -115,7 +115,13 @@ func (n *Node) Connect(addrs []string, config *tls.Config) error {
}
func (n *Node) Shutdown() {
var addr net.Addr
if n.transport != nil {
addr = n.transport.conn.RemoteAddr()
}
log.Debugf("shutting down wallet %s", addr)
n.grp.StopAndWait()
log.Debugf("wallet stopped")
}
func (n *Node) handleErrors() {
@ -138,6 +144,12 @@ func (n *Node) err(err error) {
// listen processes messages from the server.
func (n *Node) listen() {
for {
select {
case <-n.grp.Ch():
return
default:
}
select {
case <-n.grp.Ch():
return
@ -248,6 +260,8 @@ func (n *Node) request(method string, params []string, v interface{}) error {
var r response
select {
case <-n.grp.Ch():
return nil
case r = <-c:
case <-time.After(n.timeout):
r = response{err: errors.Err(ErrTimeout)}