Cleanup shutdown and peers subscribe

this has errors currently, need to figure out data race
This commit is contained in:
Jeffrey Picard 2022-11-04 01:03:28 +00:00
parent 317cdf7129
commit d90a968afc
12 changed files with 201 additions and 44 deletions

View file

@ -670,7 +670,7 @@ func (db *ReadOnlyDBColumnFamily) Unwind() {
// Shutdown shuts down the db.
func (db *ReadOnlyDBColumnFamily) Shutdown() {
db.Grp.StopAndWait()
// db.Grp.StopAndWait()
log.Println("Calling cleanup...")
db.Cleanup()
log.Println("Leaving Shutdown...")

41
main.go
View file

@ -3,7 +3,6 @@ package main
import (
"context"
"fmt"
"strconv"
"time"
_ "net/http/pprof"
@ -14,6 +13,7 @@ import (
"github.com/lbryio/lbry.go/v3/extras/stop"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func main() {
@ -34,38 +34,39 @@ func main() {
stopGroup := stop.New()
// defer stopGroup.Stop()
initsignals(stopGroup.Ch())
initsignals()
interrupt := interruptListener()
// s := server.MakeHubServer(ctxWCancel, args)
s := server.MakeHubServer(stopGroup, args)
go s.Run()
defer func() {
log.Println("Shutting down server...")
// defer func() {
// log.Println("Shutting down server...")
if s.EsClient != nil {
log.Println("Stopping es client...")
s.EsClient.Stop()
}
if s.GrpcServer != nil {
log.Println("Stopping grpc server...")
s.GrpcServer.GracefulStop()
}
if s.DB != nil {
log.Println("Stopping database connection...")
s.DB.Shutdown()
}
// if s.EsClient != nil {
// log.Println("Stopping es client...")
// s.EsClient.Stop()
// }
// if s.GrpcServer != nil {
// log.Println("Stopping grpc server...")
// s.GrpcServer.GracefulStop()
// }
// if s.DB != nil {
// log.Println("Stopping database connection...")
// s.DB.Shutdown()
// }
log.Println("Returning from main...")
}()
// log.Println("Returning from main...")
// }()
defer s.Stop()
<-interrupt
return
}
conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
grpc.WithInsecure(),
conn, err := grpc.Dial("localhost:"+string(args.Port),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
if err != nil {

View file

@ -3,7 +3,6 @@ package server
import (
"bufio"
"context"
"log"
"math"
"net"
"os"
@ -14,7 +13,9 @@ import (
"github.com/lbryio/herald.go/internal/metrics"
pb "github.com/lbryio/herald.go/protobuf/go"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// Peer holds relevant information about peers that we know about.
@ -99,7 +100,7 @@ retry:
time.Sleep(time.Second * time.Duration(math.Pow(float64(failures), 2)))
conn, err := grpc.DialContext(ctx,
"0.0.0.0:"+port,
grpc.WithInsecure(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
@ -172,7 +173,7 @@ func (s *Server) subscribeToPeer(peer *Peer) error {
conn, err := grpc.DialContext(ctx,
peer.Address+":"+peer.Port,
grpc.WithInsecure(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
if err != nil {
@ -208,7 +209,7 @@ func (s *Server) helloPeer(peer *Peer) (*pb.HelloMessage, error) {
conn, err := grpc.DialContext(ctx,
peer.Address+":"+peer.Port,
grpc.WithInsecure(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
if err != nil {
@ -278,7 +279,7 @@ func (s *Server) notifyPeer(peerToNotify *Peer, newPeer *Peer) error {
conn, err := grpc.DialContext(ctx,
peerToNotify.Address+":"+peerToNotify.Port,
grpc.WithInsecure(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
if err != nil {
@ -370,6 +371,10 @@ func (s *Server) addPeer(newPeer *Peer, ping bool, subscribe bool) error {
metrics.PeersKnown.Inc()
s.writePeers()
s.notifyPeerSubs(newPeer)
// This is weird because we're doing grpc and jsonrpc here.
// Do we still want to custom grpc?
log.Warn("Sending peer to NotifierChan")
s.NotifierChan <- newPeer
// Subscribe to all our peers for now
if subscribe {

View file

@ -51,6 +51,7 @@ func TestAddPeer(t *testing.T) {
// ctx := context.Background()
ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs()
args.DisableStartNotifier = false
tests := []struct {
name string
@ -92,6 +93,7 @@ func TestAddPeer(t *testing.T) {
log.Println(err)
}
}
hubServer.Stop()
var m = &dto.Metric{}
if err := metrics.PeersKnown.Write(m); err != nil {
t.Errorf("Error getting metrics %+v\n", err)
@ -111,6 +113,7 @@ func TestPeerWriter(t *testing.T) {
ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs()
args.DisableWritePeers = false
args.DisableStartNotifier = false
tests := []struct {
name string
@ -156,6 +159,7 @@ func TestPeerWriter(t *testing.T) {
if got != tt.want {
t.Errorf("lineCountFile(peers.txt) = %d, want %d", got, tt.want)
}
hubServer.Stop()
})
}
@ -167,8 +171,11 @@ func TestAddPeerEndpoint(t *testing.T) {
// ctx := context.Background()
ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs()
args.DisableStartNotifier = false
args2 := server.MakeDefaultTestArgs()
args2.DisableStartNotifier = false
args2.Port = 50052
args2.NotifierPort = "18081"
tests := []struct {
name string
@ -219,8 +226,8 @@ func TestAddPeerEndpoint(t *testing.T) {
log.Println(err)
}
hubServer.GrpcServer.GracefulStop()
hubServer2.GrpcServer.GracefulStop()
// hubServer.GrpcServer.GracefulStop()
// hubServer2.GrpcServer.GracefulStop()
got1 := hubServer.GetNumPeersExported()()
got2 := hubServer2.GetNumPeersExported()()
if got1 != tt.wantServerOne {
@ -229,6 +236,8 @@ func TestAddPeerEndpoint(t *testing.T) {
if got2 != tt.wantServerTwo {
t.Errorf("len(hubServer2.PeerServers) = %d, want %d\n", got2, tt.wantServerTwo)
}
hubServer.Stop()
hubServer2.Stop()
})
}
@ -243,6 +252,11 @@ func TestAddPeerEndpoint2(t *testing.T) {
args3 := server.MakeDefaultTestArgs()
args2.Port = 50052
args3.Port = 50053
args.DisableStartNotifier = false
args2.DisableStartNotifier = false
args3.DisableStartNotifier = false
args2.NotifierPort = "18081"
args3.NotifierPort = "18082"
tests := []struct {
name string
@ -296,9 +310,9 @@ func TestAddPeerEndpoint2(t *testing.T) {
log.Println(err)
}
hubServer.GrpcServer.GracefulStop()
hubServer2.GrpcServer.GracefulStop()
hubServer3.GrpcServer.GracefulStop()
// hubServer.GrpcServer.GracefulStop()
// hubServer2.GrpcServer.GracefulStop()
// hubServer3.GrpcServer.GracefulStop()
got1 := hubServer.GetNumPeersExported()()
got2 := hubServer2.GetNumPeersExported()()
got3 := hubServer3.GetNumPeersExported()()
@ -311,6 +325,9 @@ func TestAddPeerEndpoint2(t *testing.T) {
if got3 != tt.wantServerThree {
t.Errorf("len(hubServer3.PeerServers) = %d, want %d\n", got3, tt.wantServerThree)
}
hubServer.Stop()
hubServer2.Stop()
hubServer3.Stop()
})
}
@ -325,6 +342,11 @@ func TestAddPeerEndpoint3(t *testing.T) {
args3 := server.MakeDefaultTestArgs()
args2.Port = 50052
args3.Port = 50053
args.DisableStartNotifier = false
args2.DisableStartNotifier = false
args3.DisableStartNotifier = false
args2.NotifierPort = "18081"
args3.NotifierPort = "18082"
tests := []struct {
name string
@ -386,9 +408,13 @@ func TestAddPeerEndpoint3(t *testing.T) {
log.Println(err)
}
hubServer.GrpcServer.GracefulStop()
hubServer2.GrpcServer.GracefulStop()
hubServer3.GrpcServer.GracefulStop()
// hubServer.GrpcServer.GracefulStop()
// hubServer2.GrpcServer.GracefulStop()
// hubServer3.GrpcServer.GracefulStop()
hubServer.Stop()
hubServer2.Stop()
hubServer3.Stop()
got1 := hubServer.GetNumPeersExported()()
got2 := hubServer2.GetNumPeersExported()()
got3 := hubServer3.GetNumPeersExported()()
@ -411,9 +437,9 @@ func TestUDPServer(t *testing.T) {
// ctx := context.Background()
ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs()
args.DisableStartUDP = false
args2 := server.MakeDefaultTestArgs()
args2.Port = 50052
args.DisableStartUDP = false
args2.DisableStartUDP = false
tests := []struct {
@ -444,8 +470,10 @@ func TestUDPServer(t *testing.T) {
log.Println(err)
}
hubServer.GrpcServer.GracefulStop()
hubServer2.GrpcServer.GracefulStop()
// hubServer.GrpcServer.GracefulStop()
// hubServer2.GrpcServer.GracefulStop()
hubServer.Stop()
hubServer2.Stop()
got1 := hubServer.ExternalIP.String()
if got1 != tt.want {

View file

@ -0,0 +1,24 @@
package server
import (
log "github.com/sirupsen/logrus"
)
type PeersService struct{}
type PeersSubscribeReq struct {
Ip string `json:"ip"`
Host string `json:"host"`
Details []string `json:"details"`
}
type PeersSubscribeResp struct{}
// Features is the json rpc endpoint for 'server.peers.subcribe'.
func (t *ServerService) PeersSubscribe(req *PeersSubscribeReq, res **PeersSubscribeResp) error {
log.Println("PeersSubscribe")
*res = nil
return nil
}

View file

@ -134,6 +134,14 @@ fail1:
goto fail2
}
// Register "server.peers" handlers.
peersSvc := &PeersService{}
err = s1.RegisterTCPService(peersSvc, "server_peers")
if err != nil {
log.Errorf("RegisterTCPService: %v\n", err)
goto fail2
}
r := gorilla_mux.NewRouter()
r.Handle("/rpc", s1)
port := ":" + strconv.FormatUint(uint64(s.Args.JSONRPCHTTPPort), 10)

View file

@ -57,6 +57,8 @@ func (s *Server) RunNotifier() error {
case internal.HeightHash:
heightHash, _ := notification.(internal.HeightHash)
s.DoNotify(&heightHash)
default:
logrus.Warn("unknown notification type")
}
s.sessionManager.doNotify(notification)
}
@ -65,6 +67,7 @@ func (s *Server) RunNotifier() error {
// NotifierServer implements the TCP protocol for height/blockheader notifications
func (s *Server) NotifierServer() error {
s.Grp.Add(1)
address := ":" + s.Args.NotifierPort
addr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
@ -77,11 +80,27 @@ func (s *Server) NotifierServer() error {
}
defer listen.Close()
rdyCh := make(chan bool)
for {
var conn net.Conn
var err error
logrus.Info("Waiting for connection")
conn, err := listen.Accept()
go func() {
conn, err = listen.Accept()
rdyCh <- true
}()
select {
case <-s.Grp.Ch():
s.Grp.Done()
return nil
case <-rdyCh:
logrus.Info("Connection accepted")
}
if err != nil {
logrus.Warn(err)
continue

View file

@ -15,6 +15,26 @@ import (
const defaultBufferSize = 1024
func subReady(s *server.Server) error {
sleepTime := time.Millisecond * 100
for {
if sleepTime > time.Second {
return fmt.Errorf("timeout")
}
s.HeightSubsMut.RLock()
if len(s.HeightSubs) > 0 {
s.HeightSubsMut.RUnlock()
return nil
}
s.HeightSubsMut.RUnlock()
logrus.Warn("waiting for subscriber")
time.Sleep(sleepTime)
sleepTime = sleepTime * 2
}
}
func tcpConnReady(addr string) (net.Conn, error) {
sleepTime := time.Millisecond * 100
for {
@ -55,6 +75,8 @@ func TestNotifierServer(t *testing.T) {
go hub.NotifierServer()
go hub.RunNotifier()
// time.Sleep(time.Second * 2)
addr := fmt.Sprintf(":%s", args.NotifierPort)
logrus.Info(addr)
conn, err := tcpConnReady(addr)
@ -77,7 +99,11 @@ func TestNotifierServer(t *testing.T) {
// Hacky but needed because if the reader isn't ready
// before the writer sends it won't get the data
time.Sleep(time.Second)
// time.Sleep(time.Second * 10)
err = subReady(hub)
if err != nil {
t.Fatal(err)
}
hash, _ := hex.DecodeString("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
logrus.Warn("sending hash")

View file

@ -56,6 +56,7 @@ type Server struct {
HeightSubsMut sync.RWMutex
NotifierChan chan interface{}
Grp *stop.Group
notiferListener *net.TCPListener
sessionManager *sessionManager
pb.UnimplementedHubServer
}
@ -153,6 +154,27 @@ func (s *Server) Run() {
}
}
func (s *Server) Stop() {
log.Println("Shutting down server...")
if s.EsClient != nil {
log.Println("Stopping es client...")
s.EsClient.Stop()
}
if s.GrpcServer != nil {
log.Println("Stopping grpc server...")
s.GrpcServer.GracefulStop()
}
log.Println("Stopping other server threads...")
s.Grp.StopAndWait()
if s.DB != nil {
log.Println("Stopping database connection...")
s.DB.Shutdown()
}
log.Println("Returning from Stop...")
}
func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, error) {
tmpName, err := ioutil.TempDir("", "go-lbry-hub")
if err != nil {
@ -338,7 +360,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
ExternalIP: net.IPv4(127, 0, 0, 1),
HeightSubs: make(map[net.Addr]net.Conn),
HeightSubsMut: sync.RWMutex{},
NotifierChan: make(chan interface{}),
NotifierChan: make(chan interface{}, 1),
Grp: grp,
sessionManager: newSessionManager(myDB, args, sessionGrp, &chain),
}

View file

@ -33,6 +33,11 @@ type hashXNotification struct {
statusStr string
}
type peerNotification struct {
address string
port string
}
type session struct {
id uintptr
addr net.Addr
@ -41,6 +46,8 @@ type session struct {
hashXSubs map[[HASHX_LEN]byte]string
// headersSub indicates header subscription
headersSub bool
// peersSub indicates peer subscription
peersBool bool
// headersSubRaw indicates the header subscription mode
headersSubRaw bool
// client provides the ability to send notifications
@ -95,6 +102,11 @@ func (s *session) doNotify(notification interface{}) {
status = hex.EncodeToString(note.status)
}
params = []string{orig, status}
case peerNotification:
note, _ := notification.(peerNotification)
method = "server.peers.subscribe"
params = []string{note.address, note.port}
default:
log.Warnf("unknown notification type: %v", notification)
return
@ -127,6 +139,8 @@ type sessionManager struct {
db *db.ReadOnlyDBColumnFamily
args *Args
chain *chaincfg.Params
// peerSubs are sessions subscribed via 'blockchain.peers.subscribe'
peerSubs sessionMap
// headerSubs are sessions subscribed via 'blockchain.headers.subscribe'
headerSubs sessionMap
// hashXSubs are sessions subscribed via 'blockchain.{address,scripthash}.subscribe'
@ -143,6 +157,7 @@ func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Grou
db: db,
args: args,
chain: chain,
peerSubs: make(sessionMap),
headerSubs: make(sessionMap),
hashXSubs: make(map[[HASHX_LEN]byte]sessionMap),
}
@ -211,6 +226,13 @@ func (sm *sessionManager) addSession(conn net.Conn) *session {
log.Errorf("RegisterName: %v\n", err)
}
// Register "server.peers" handlers.
peersSvc := &PeersService{}
err = s1.RegisterName("server.peers", peersSvc)
if err != nil {
log.Errorf("RegisterName: %v\n", err)
}
// Register "blockchain.claimtrie.*"" handlers.
claimtrieSvc := &ClaimtrieService{sm.db}
err = s1.RegisterName("blockchain.claimtrie", claimtrieSvc)
@ -351,6 +373,8 @@ func (sm *sessionManager) doNotify(notification interface{}) {
if len(subsCopy) > 0 {
note.statusStr = hex.EncodeToString(note.status)
}
case peerNotification:
subsCopy = sm.peerSubs
default:
log.Warnf("unknown notification type: %v", notification)
}

View file

@ -49,6 +49,9 @@ func interruptListener() <-chan struct{} {
case sig := <-interruptChannel:
log.Infof("Received signal (%s). Already "+
"shutting down...", sig)
case <-shutdownRequestChannel:
log.Info("Shutdown requested. Already " +
"shutting down...")
}
}
}()

View file

@ -10,12 +10,9 @@ package main
import (
"os"
"syscall"
"github.com/lbryio/lbry.go/v3/extras/stop"
)
// initsignals sets the signals to be caught by the signal handler
func initsignals(stopCh stop.Chan) {
shutdownRequestChannel = stopCh
func initsignals() {
interruptSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
}