diff --git a/db/db.go b/db/db.go index ef3da42..3630eac 100644 --- a/db/db.go +++ b/db/db.go @@ -670,7 +670,7 @@ func (db *ReadOnlyDBColumnFamily) Unwind() { // Shutdown shuts down the db. func (db *ReadOnlyDBColumnFamily) Shutdown() { - db.Grp.StopAndWait() + // db.Grp.StopAndWait() log.Println("Calling cleanup...") db.Cleanup() log.Println("Leaving Shutdown...") diff --git a/main.go b/main.go index df2a62b..01138be 100644 --- a/main.go +++ b/main.go @@ -3,7 +3,6 @@ package main import ( "context" "fmt" - "strconv" "time" _ "net/http/pprof" @@ -14,6 +13,7 @@ import ( "github.com/lbryio/lbry.go/v3/extras/stop" log "github.com/sirupsen/logrus" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) func main() { @@ -34,38 +34,39 @@ func main() { stopGroup := stop.New() // defer stopGroup.Stop() - initsignals(stopGroup.Ch()) + initsignals() interrupt := interruptListener() // s := server.MakeHubServer(ctxWCancel, args) s := server.MakeHubServer(stopGroup, args) go s.Run() - defer func() { - log.Println("Shutting down server...") + // defer func() { + // log.Println("Shutting down server...") - if s.EsClient != nil { - log.Println("Stopping es client...") - s.EsClient.Stop() - } - if s.GrpcServer != nil { - log.Println("Stopping grpc server...") - s.GrpcServer.GracefulStop() - } - if s.DB != nil { - log.Println("Stopping database connection...") - s.DB.Shutdown() - } + // if s.EsClient != nil { + // log.Println("Stopping es client...") + // s.EsClient.Stop() + // } + // if s.GrpcServer != nil { + // log.Println("Stopping grpc server...") + // s.GrpcServer.GracefulStop() + // } + // if s.DB != nil { + // log.Println("Stopping database connection...") + // s.DB.Shutdown() + // } - log.Println("Returning from main...") - }() + // log.Println("Returning from main...") + // }() + defer s.Stop() <-interrupt return } - conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port), - grpc.WithInsecure(), + conn, err := grpc.Dial("localhost:"+string(args.Port), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), ) if err != nil { diff --git a/server/federation.go b/server/federation.go index 6bd9ae5..6ffce60 100644 --- a/server/federation.go +++ b/server/federation.go @@ -3,7 +3,6 @@ package server import ( "bufio" "context" - "log" "math" "net" "os" @@ -14,7 +13,9 @@ import ( "github.com/lbryio/herald.go/internal/metrics" pb "github.com/lbryio/herald.go/protobuf/go" + log "github.com/sirupsen/logrus" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) // Peer holds relevant information about peers that we know about. @@ -99,7 +100,7 @@ retry: time.Sleep(time.Second * time.Duration(math.Pow(float64(failures), 2))) conn, err := grpc.DialContext(ctx, "0.0.0.0:"+port, - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), ) @@ -172,7 +173,7 @@ func (s *Server) subscribeToPeer(peer *Peer) error { conn, err := grpc.DialContext(ctx, peer.Address+":"+peer.Port, - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), ) if err != nil { @@ -208,7 +209,7 @@ func (s *Server) helloPeer(peer *Peer) (*pb.HelloMessage, error) { conn, err := grpc.DialContext(ctx, peer.Address+":"+peer.Port, - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), ) if err != nil { @@ -278,7 +279,7 @@ func (s *Server) notifyPeer(peerToNotify *Peer, newPeer *Peer) error { conn, err := grpc.DialContext(ctx, peerToNotify.Address+":"+peerToNotify.Port, - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), ) if err != nil { @@ -370,6 +371,10 @@ func (s *Server) addPeer(newPeer *Peer, ping bool, subscribe bool) error { metrics.PeersKnown.Inc() s.writePeers() s.notifyPeerSubs(newPeer) + // This is weird because we're doing grpc and jsonrpc here. + // Do we still want to custom grpc? + log.Warn("Sending peer to NotifierChan") + s.NotifierChan <- newPeer // Subscribe to all our peers for now if subscribe { diff --git a/server/federation_test.go b/server/federation_test.go index a332650..0c09436 100644 --- a/server/federation_test.go +++ b/server/federation_test.go @@ -51,6 +51,7 @@ func TestAddPeer(t *testing.T) { // ctx := context.Background() ctx := stop.NewDebug() args := server.MakeDefaultTestArgs() + args.DisableStartNotifier = false tests := []struct { name string @@ -92,6 +93,7 @@ func TestAddPeer(t *testing.T) { log.Println(err) } } + hubServer.Stop() var m = &dto.Metric{} if err := metrics.PeersKnown.Write(m); err != nil { t.Errorf("Error getting metrics %+v\n", err) @@ -111,6 +113,7 @@ func TestPeerWriter(t *testing.T) { ctx := stop.NewDebug() args := server.MakeDefaultTestArgs() args.DisableWritePeers = false + args.DisableStartNotifier = false tests := []struct { name string @@ -156,6 +159,7 @@ func TestPeerWriter(t *testing.T) { if got != tt.want { t.Errorf("lineCountFile(peers.txt) = %d, want %d", got, tt.want) } + hubServer.Stop() }) } @@ -167,8 +171,11 @@ func TestAddPeerEndpoint(t *testing.T) { // ctx := context.Background() ctx := stop.NewDebug() args := server.MakeDefaultTestArgs() + args.DisableStartNotifier = false args2 := server.MakeDefaultTestArgs() + args2.DisableStartNotifier = false args2.Port = 50052 + args2.NotifierPort = "18081" tests := []struct { name string @@ -219,8 +226,8 @@ func TestAddPeerEndpoint(t *testing.T) { log.Println(err) } - hubServer.GrpcServer.GracefulStop() - hubServer2.GrpcServer.GracefulStop() + // hubServer.GrpcServer.GracefulStop() + // hubServer2.GrpcServer.GracefulStop() got1 := hubServer.GetNumPeersExported()() got2 := hubServer2.GetNumPeersExported()() if got1 != tt.wantServerOne { @@ -229,6 +236,8 @@ func TestAddPeerEndpoint(t *testing.T) { if got2 != tt.wantServerTwo { t.Errorf("len(hubServer2.PeerServers) = %d, want %d\n", got2, tt.wantServerTwo) } + hubServer.Stop() + hubServer2.Stop() }) } @@ -243,6 +252,11 @@ func TestAddPeerEndpoint2(t *testing.T) { args3 := server.MakeDefaultTestArgs() args2.Port = 50052 args3.Port = 50053 + args.DisableStartNotifier = false + args2.DisableStartNotifier = false + args3.DisableStartNotifier = false + args2.NotifierPort = "18081" + args3.NotifierPort = "18082" tests := []struct { name string @@ -296,9 +310,9 @@ func TestAddPeerEndpoint2(t *testing.T) { log.Println(err) } - hubServer.GrpcServer.GracefulStop() - hubServer2.GrpcServer.GracefulStop() - hubServer3.GrpcServer.GracefulStop() + // hubServer.GrpcServer.GracefulStop() + // hubServer2.GrpcServer.GracefulStop() + // hubServer3.GrpcServer.GracefulStop() got1 := hubServer.GetNumPeersExported()() got2 := hubServer2.GetNumPeersExported()() got3 := hubServer3.GetNumPeersExported()() @@ -311,6 +325,9 @@ func TestAddPeerEndpoint2(t *testing.T) { if got3 != tt.wantServerThree { t.Errorf("len(hubServer3.PeerServers) = %d, want %d\n", got3, tt.wantServerThree) } + hubServer.Stop() + hubServer2.Stop() + hubServer3.Stop() }) } @@ -325,6 +342,11 @@ func TestAddPeerEndpoint3(t *testing.T) { args3 := server.MakeDefaultTestArgs() args2.Port = 50052 args3.Port = 50053 + args.DisableStartNotifier = false + args2.DisableStartNotifier = false + args3.DisableStartNotifier = false + args2.NotifierPort = "18081" + args3.NotifierPort = "18082" tests := []struct { name string @@ -386,9 +408,13 @@ func TestAddPeerEndpoint3(t *testing.T) { log.Println(err) } - hubServer.GrpcServer.GracefulStop() - hubServer2.GrpcServer.GracefulStop() - hubServer3.GrpcServer.GracefulStop() + // hubServer.GrpcServer.GracefulStop() + // hubServer2.GrpcServer.GracefulStop() + // hubServer3.GrpcServer.GracefulStop() + + hubServer.Stop() + hubServer2.Stop() + hubServer3.Stop() got1 := hubServer.GetNumPeersExported()() got2 := hubServer2.GetNumPeersExported()() got3 := hubServer3.GetNumPeersExported()() @@ -411,9 +437,9 @@ func TestUDPServer(t *testing.T) { // ctx := context.Background() ctx := stop.NewDebug() args := server.MakeDefaultTestArgs() - args.DisableStartUDP = false args2 := server.MakeDefaultTestArgs() args2.Port = 50052 + args.DisableStartUDP = false args2.DisableStartUDP = false tests := []struct { @@ -444,8 +470,10 @@ func TestUDPServer(t *testing.T) { log.Println(err) } - hubServer.GrpcServer.GracefulStop() - hubServer2.GrpcServer.GracefulStop() + // hubServer.GrpcServer.GracefulStop() + // hubServer2.GrpcServer.GracefulStop() + hubServer.Stop() + hubServer2.Stop() got1 := hubServer.ExternalIP.String() if got1 != tt.want { diff --git a/server/jsonrpc_federation.go b/server/jsonrpc_federation.go new file mode 100644 index 0000000..4ae8870 --- /dev/null +++ b/server/jsonrpc_federation.go @@ -0,0 +1,24 @@ +package server + +import ( + log "github.com/sirupsen/logrus" +) + +type PeersService struct{} + +type PeersSubscribeReq struct { + Ip string `json:"ip"` + Host string `json:"host"` + Details []string `json:"details"` +} + +type PeersSubscribeResp struct{} + +// Features is the json rpc endpoint for 'server.peers.subcribe'. +func (t *ServerService) PeersSubscribe(req *PeersSubscribeReq, res **PeersSubscribeResp) error { + log.Println("PeersSubscribe") + + *res = nil + + return nil +} diff --git a/server/jsonrpc_service.go b/server/jsonrpc_service.go index b846c00..47f5d72 100644 --- a/server/jsonrpc_service.go +++ b/server/jsonrpc_service.go @@ -134,6 +134,14 @@ fail1: goto fail2 } + // Register "server.peers" handlers. + peersSvc := &PeersService{} + err = s1.RegisterTCPService(peersSvc, "server_peers") + if err != nil { + log.Errorf("RegisterTCPService: %v\n", err) + goto fail2 + } + r := gorilla_mux.NewRouter() r.Handle("/rpc", s1) port := ":" + strconv.FormatUint(uint64(s.Args.JSONRPCHTTPPort), 10) diff --git a/server/notifier.go b/server/notifier.go index 1f4a847..9bd69e7 100644 --- a/server/notifier.go +++ b/server/notifier.go @@ -57,6 +57,8 @@ func (s *Server) RunNotifier() error { case internal.HeightHash: heightHash, _ := notification.(internal.HeightHash) s.DoNotify(&heightHash) + default: + logrus.Warn("unknown notification type") } s.sessionManager.doNotify(notification) } @@ -65,6 +67,7 @@ func (s *Server) RunNotifier() error { // NotifierServer implements the TCP protocol for height/blockheader notifications func (s *Server) NotifierServer() error { + s.Grp.Add(1) address := ":" + s.Args.NotifierPort addr, err := net.ResolveTCPAddr("tcp", address) if err != nil { @@ -77,11 +80,27 @@ func (s *Server) NotifierServer() error { } defer listen.Close() + rdyCh := make(chan bool) for { + var conn net.Conn + var err error logrus.Info("Waiting for connection") - conn, err := listen.Accept() + + go func() { + conn, err = listen.Accept() + rdyCh <- true + }() + + select { + case <-s.Grp.Ch(): + s.Grp.Done() + return nil + case <-rdyCh: + logrus.Info("Connection accepted") + } + if err != nil { logrus.Warn(err) continue diff --git a/server/notifier_test.go b/server/notifier_test.go index d482b60..d19f2a1 100644 --- a/server/notifier_test.go +++ b/server/notifier_test.go @@ -15,6 +15,26 @@ import ( const defaultBufferSize = 1024 +func subReady(s *server.Server) error { + sleepTime := time.Millisecond * 100 + for { + if sleepTime > time.Second { + return fmt.Errorf("timeout") + } + s.HeightSubsMut.RLock() + if len(s.HeightSubs) > 0 { + s.HeightSubsMut.RUnlock() + return nil + } + s.HeightSubsMut.RUnlock() + + logrus.Warn("waiting for subscriber") + time.Sleep(sleepTime) + sleepTime = sleepTime * 2 + + } +} + func tcpConnReady(addr string) (net.Conn, error) { sleepTime := time.Millisecond * 100 for { @@ -55,6 +75,8 @@ func TestNotifierServer(t *testing.T) { go hub.NotifierServer() go hub.RunNotifier() + // time.Sleep(time.Second * 2) + addr := fmt.Sprintf(":%s", args.NotifierPort) logrus.Info(addr) conn, err := tcpConnReady(addr) @@ -77,7 +99,11 @@ func TestNotifierServer(t *testing.T) { // Hacky but needed because if the reader isn't ready // before the writer sends it won't get the data - time.Sleep(time.Second) + // time.Sleep(time.Second * 10) + err = subReady(hub) + if err != nil { + t.Fatal(err) + } hash, _ := hex.DecodeString("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") logrus.Warn("sending hash") diff --git a/server/server.go b/server/server.go index d128089..7af55ac 100644 --- a/server/server.go +++ b/server/server.go @@ -56,6 +56,7 @@ type Server struct { HeightSubsMut sync.RWMutex NotifierChan chan interface{} Grp *stop.Group + notiferListener *net.TCPListener sessionManager *sessionManager pb.UnimplementedHubServer } @@ -153,6 +154,27 @@ func (s *Server) Run() { } } +func (s *Server) Stop() { + log.Println("Shutting down server...") + + if s.EsClient != nil { + log.Println("Stopping es client...") + s.EsClient.Stop() + } + if s.GrpcServer != nil { + log.Println("Stopping grpc server...") + s.GrpcServer.GracefulStop() + } + log.Println("Stopping other server threads...") + s.Grp.StopAndWait() + if s.DB != nil { + log.Println("Stopping database connection...") + s.DB.Shutdown() + } + + log.Println("Returning from Stop...") +} + func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, error) { tmpName, err := ioutil.TempDir("", "go-lbry-hub") if err != nil { @@ -338,7 +360,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server { ExternalIP: net.IPv4(127, 0, 0, 1), HeightSubs: make(map[net.Addr]net.Conn), HeightSubsMut: sync.RWMutex{}, - NotifierChan: make(chan interface{}), + NotifierChan: make(chan interface{}, 1), Grp: grp, sessionManager: newSessionManager(myDB, args, sessionGrp, &chain), } diff --git a/server/session.go b/server/session.go index 0b9136b..2d0107c 100644 --- a/server/session.go +++ b/server/session.go @@ -33,6 +33,11 @@ type hashXNotification struct { statusStr string } +type peerNotification struct { + address string + port string +} + type session struct { id uintptr addr net.Addr @@ -41,6 +46,8 @@ type session struct { hashXSubs map[[HASHX_LEN]byte]string // headersSub indicates header subscription headersSub bool + // peersSub indicates peer subscription + peersBool bool // headersSubRaw indicates the header subscription mode headersSubRaw bool // client provides the ability to send notifications @@ -95,6 +102,11 @@ func (s *session) doNotify(notification interface{}) { status = hex.EncodeToString(note.status) } params = []string{orig, status} + case peerNotification: + note, _ := notification.(peerNotification) + method = "server.peers.subscribe" + params = []string{note.address, note.port} + default: log.Warnf("unknown notification type: %v", notification) return @@ -127,6 +139,8 @@ type sessionManager struct { db *db.ReadOnlyDBColumnFamily args *Args chain *chaincfg.Params + // peerSubs are sessions subscribed via 'blockchain.peers.subscribe' + peerSubs sessionMap // headerSubs are sessions subscribed via 'blockchain.headers.subscribe' headerSubs sessionMap // hashXSubs are sessions subscribed via 'blockchain.{address,scripthash}.subscribe' @@ -143,6 +157,7 @@ func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Grou db: db, args: args, chain: chain, + peerSubs: make(sessionMap), headerSubs: make(sessionMap), hashXSubs: make(map[[HASHX_LEN]byte]sessionMap), } @@ -211,6 +226,13 @@ func (sm *sessionManager) addSession(conn net.Conn) *session { log.Errorf("RegisterName: %v\n", err) } + // Register "server.peers" handlers. + peersSvc := &PeersService{} + err = s1.RegisterName("server.peers", peersSvc) + if err != nil { + log.Errorf("RegisterName: %v\n", err) + } + // Register "blockchain.claimtrie.*"" handlers. claimtrieSvc := &ClaimtrieService{sm.db} err = s1.RegisterName("blockchain.claimtrie", claimtrieSvc) @@ -351,6 +373,8 @@ func (sm *sessionManager) doNotify(notification interface{}) { if len(subsCopy) > 0 { note.statusStr = hex.EncodeToString(note.status) } + case peerNotification: + subsCopy = sm.peerSubs default: log.Warnf("unknown notification type: %v", notification) } diff --git a/signal.go b/signal.go index 1a9b7e2..e35cc3f 100644 --- a/signal.go +++ b/signal.go @@ -49,6 +49,9 @@ func interruptListener() <-chan struct{} { case sig := <-interruptChannel: log.Infof("Received signal (%s). Already "+ "shutting down...", sig) + case <-shutdownRequestChannel: + log.Info("Shutdown requested. Already " + + "shutting down...") } } }() diff --git a/signalsigterm.go b/signalsigterm.go index c9435fc..dbd9750 100644 --- a/signalsigterm.go +++ b/signalsigterm.go @@ -10,12 +10,9 @@ package main import ( "os" "syscall" - - "github.com/lbryio/lbry.go/v3/extras/stop" ) // initsignals sets the signals to be caught by the signal handler -func initsignals(stopCh stop.Chan) { - shutdownRequestChannel = stopCh +func initsignals() { interruptSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} }