diff --git a/db/db.go b/db/db.go index ef3da42..451415c 100644 --- a/db/db.go +++ b/db/db.go @@ -670,7 +670,6 @@ func (db *ReadOnlyDBColumnFamily) Unwind() { // Shutdown shuts down the db. func (db *ReadOnlyDBColumnFamily) Shutdown() { - db.Grp.StopAndWait() log.Println("Calling cleanup...") db.Cleanup() log.Println("Leaving Shutdown...") diff --git a/db/db_get.go b/db/db_get.go index b6f0c10..4fcb8c2 100644 --- a/db/db_get.go +++ b/db/db_get.go @@ -958,7 +958,7 @@ func (db *ReadOnlyDBColumnFamily) GetTxMerkle(tx_hashes []chainhash.Hash) ([]TxM } blockTxsCache[txHeight] = txs } - blockTxs, _ := blockTxsCache[txHeight] + blockTxs := blockTxsCache[txHeight] results = append(results, TxMerkle{ TxHash: txNumKey.TxHash, RawTx: txVal.RawTx, @@ -970,6 +970,45 @@ func (db *ReadOnlyDBColumnFamily) GetTxMerkle(tx_hashes []chainhash.Hash) ([]TxM return results, nil } +func (db *ReadOnlyDBColumnFamily) GetClaimByID(claimID string) ([]*ExpandedResolveResult, []*ExpandedResolveResult, error) { + rows := make([]*ExpandedResolveResult, 0) + extras := make([]*ExpandedResolveResult, 0) + claimHash, err := hex.DecodeString(claimID) + if err != nil { + return nil, nil, err + } + + stream, err := db.FsGetClaimByHash(claimHash) + if err != nil { + return nil, nil, err + } + var res = NewExpandedResolveResult() + res.Stream = &optionalResolveResultOrError{res: stream} + rows = append(rows, res) + + if stream != nil && stream.ChannelHash != nil { + channel, err := db.FsGetClaimByHash(stream.ChannelHash) + if err != nil { + return nil, nil, err + } + var res = NewExpandedResolveResult() + res.Channel = &optionalResolveResultOrError{res: channel} + extras = append(extras, res) + } + + if stream != nil && stream.RepostedClaimHash != nil { + repost, err := db.FsGetClaimByHash(stream.RepostedClaimHash) + if err != nil { + return nil, nil, err + } + var res = NewExpandedResolveResult() + res.Repost = &optionalResolveResultOrError{res: repost} + extras = append(extras, res) + } + + return rows, extras, nil +} + func (db *ReadOnlyDBColumnFamily) GetDBState() (*prefixes.DBStateValue, error) { handle, err := db.EnsureHandle(prefixes.DBState) if err != nil { diff --git a/db/db_test.go b/db/db_test.go index e7114eb..0eccddf 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/csv" "encoding/hex" - "log" "os" "strings" "testing" @@ -14,6 +13,7 @@ import ( "github.com/lbryio/herald.go/internal" "github.com/lbryio/lbry.go/v3/extras/stop" "github.com/linxGnu/grocksdb" + log "github.com/sirupsen/logrus" ) //////////////////////////////////////////////////////////////////////////////// diff --git a/db/prefixes/prefixes.go b/db/prefixes/prefixes.go index 73e529f..b8b7675 100644 --- a/db/prefixes/prefixes.go +++ b/db/prefixes/prefixes.go @@ -12,13 +12,13 @@ import ( "encoding/binary" "encoding/hex" "fmt" - "log" "reflect" "sort" "strings" "github.com/lbryio/herald.go/internal" "github.com/lbryio/lbcd/chaincfg/chainhash" + log "github.com/sirupsen/logrus" ) const ( diff --git a/db/prefixes/prefixes_test.go b/db/prefixes/prefixes_test.go index 125c5ef..07bde76 100644 --- a/db/prefixes/prefixes_test.go +++ b/db/prefixes/prefixes_test.go @@ -6,7 +6,6 @@ import ( "encoding/csv" "encoding/hex" "fmt" - "log" "math" "math/big" "os" @@ -16,6 +15,7 @@ import ( dbpkg "github.com/lbryio/herald.go/db" prefixes "github.com/lbryio/herald.go/db/prefixes" "github.com/linxGnu/grocksdb" + log "github.com/sirupsen/logrus" ) func TestPrefixRegistry(t *testing.T) { diff --git a/main.go b/main.go index df2a62b..5ebf656 100644 --- a/main.go +++ b/main.go @@ -3,7 +3,6 @@ package main import ( "context" "fmt" - "strconv" "time" _ "net/http/pprof" @@ -14,6 +13,7 @@ import ( "github.com/lbryio/lbry.go/v3/extras/stop" log "github.com/sirupsen/logrus" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) func main() { @@ -29,43 +29,22 @@ func main() { if args.CmdType == server.ServeCmd { // This will cancel goroutines with the server finishes. - // ctxWCancel, cancel := context.WithCancel(ctx) - // defer cancel() stopGroup := stop.New() - // defer stopGroup.Stop() - initsignals(stopGroup.Ch()) + initsignals() interrupt := interruptListener() - // s := server.MakeHubServer(ctxWCancel, args) s := server.MakeHubServer(stopGroup, args) go s.Run() - defer func() { - log.Println("Shutting down server...") - - if s.EsClient != nil { - log.Println("Stopping es client...") - s.EsClient.Stop() - } - if s.GrpcServer != nil { - log.Println("Stopping grpc server...") - s.GrpcServer.GracefulStop() - } - if s.DB != nil { - log.Println("Stopping database connection...") - s.DB.Shutdown() - } - - log.Println("Returning from main...") - }() + defer s.Stop() <-interrupt return } - conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port), - grpc.WithInsecure(), + conn, err := grpc.Dial("localhost:"+fmt.Sprintf("%d", args.Port), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), ) if err != nil { diff --git a/server/args.go b/server/args.go index 491cbd4..6a18ffd 100644 --- a/server/args.go +++ b/server/args.go @@ -26,9 +26,9 @@ type Args struct { DBPath string Chain *string EsHost string - EsPort string - PrometheusPort string - NotifierPort string + EsPort int + PrometheusPort int + NotifierPort int JSONRPCPort int JSONRPCHTTPPort int MaxSessions int @@ -71,9 +71,9 @@ const ( DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME DefaultEsHost = "http://localhost" DefaultEsIndex = "claims" - DefaultEsPort = "9200" - DefaultPrometheusPort = "2112" - DefaultNotifierPort = "18080" + DefaultEsPort = 9200 + DefaultPrometheusPort = 2112 + DefaultNotifierPort = 18080 DefaultJSONRPCPort = 50001 DefaultJSONRPCHTTPPort = 50002 DefaultMaxSessions = 10000 @@ -219,9 +219,9 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { chain := parser.Selector("", "chain", []string{chaincfg.MainNetParams.Name, chaincfg.TestNet3Params.Name, chaincfg.RegressionNetParams.Name, "testnet"}, &argparse.Options{Required: false, Help: "Which chain to use, default is 'mainnet'. Values 'regtest' and 'testnet' are for testing", Default: chaincfg.MainNetParams.Name}) esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: DefaultEsHost}) - esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort}) - prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort}) - notifierPort := parser.String("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort}) + esPort := parser.Int("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort}) + prometheusPort := parser.Int("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort}) + notifierPort := parser.Int("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort}) jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort, Default: DefaultJSONRPCPort}) jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort, Default: DefaultJSONRPCHTTPPort}) maxSessions := parser.Int("", "max-sessions", &argparse.Options{Required: false, Help: "Maximum number of electrum clients that can be connected", Default: DefaultMaxSessions}) @@ -334,11 +334,17 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { } if esPort, ok := environment["ELASTIC_PORT"]; ok { - args.EsPort = esPort + args.EsPort, err = strconv.Atoi(esPort) + if err != nil { + log.Fatal(err) + } } if prometheusPort, ok := environment["GOHUB_PROMETHEUS_PORT"]; ok { - args.PrometheusPort = prometheusPort + args.PrometheusPort, err = strconv.Atoi(prometheusPort) + if err != nil { + log.Fatal(err) + } } /* diff --git a/server/federation.go b/server/federation.go index 6bd9ae5..82d3f04 100644 --- a/server/federation.go +++ b/server/federation.go @@ -3,7 +3,6 @@ package server import ( "bufio" "context" - "log" "math" "net" "os" @@ -14,7 +13,9 @@ import ( "github.com/lbryio/herald.go/internal/metrics" pb "github.com/lbryio/herald.go/protobuf/go" + log "github.com/sirupsen/logrus" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) // Peer holds relevant information about peers that we know about. @@ -99,7 +100,7 @@ retry: time.Sleep(time.Second * time.Duration(math.Pow(float64(failures), 2))) conn, err := grpc.DialContext(ctx, "0.0.0.0:"+port, - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), ) @@ -172,7 +173,7 @@ func (s *Server) subscribeToPeer(peer *Peer) error { conn, err := grpc.DialContext(ctx, peer.Address+":"+peer.Port, - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), ) if err != nil { @@ -208,7 +209,7 @@ func (s *Server) helloPeer(peer *Peer) (*pb.HelloMessage, error) { conn, err := grpc.DialContext(ctx, peer.Address+":"+peer.Port, - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), ) if err != nil { @@ -278,7 +279,7 @@ func (s *Server) notifyPeer(peerToNotify *Peer, newPeer *Peer) error { conn, err := grpc.DialContext(ctx, peerToNotify.Address+":"+peerToNotify.Port, - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), ) if err != nil { @@ -370,6 +371,10 @@ func (s *Server) addPeer(newPeer *Peer, ping bool, subscribe bool) error { metrics.PeersKnown.Inc() s.writePeers() s.notifyPeerSubs(newPeer) + // This is weird because we're doing grpc and jsonrpc here. + // Do we still want to custom grpc? + log.Warn("Sending peer to NotifierChan") + s.NotifierChan <- peerNotification{newPeer.Address, newPeer.Port} // Subscribe to all our peers for now if subscribe { diff --git a/server/federation_test.go b/server/federation_test.go index a332650..d77c5a3 100644 --- a/server/federation_test.go +++ b/server/federation_test.go @@ -4,7 +4,6 @@ import ( "bufio" "context" "fmt" - "log" "net" "os" "strconv" @@ -16,13 +15,20 @@ import ( "github.com/lbryio/herald.go/server" "github.com/lbryio/lbry.go/v3/extras/stop" dto "github.com/prometheus/client_model/go" + log "github.com/sirupsen/logrus" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) // lineCountFile takes a fileName and counts the number of lines in it. func lineCountFile(fileName string) int { f, err := os.Open(fileName) - defer f.Close() + defer func() { + err := f.Close() + if err != nil { + log.Warn(err) + } + }() if err != nil { log.Println(err) return 0 @@ -51,6 +57,7 @@ func TestAddPeer(t *testing.T) { // ctx := context.Background() ctx := stop.NewDebug() args := server.MakeDefaultTestArgs() + args.DisableStartNotifier = false tests := []struct { name string @@ -100,6 +107,7 @@ func TestAddPeer(t *testing.T) { if got != tt.want { t.Errorf("len(server.PeerServers) = %d, want %d\n", got, tt.want) } + hubServer.Stop() }) } @@ -107,10 +115,10 @@ func TestAddPeer(t *testing.T) { // TestPeerWriter tests that peers get written properly func TestPeerWriter(t *testing.T) { - // ctx := context.Background() ctx := stop.NewDebug() args := server.MakeDefaultTestArgs() args.DisableWritePeers = false + args.DisableStartNotifier = false tests := []struct { name string @@ -145,17 +153,16 @@ func TestPeerWriter(t *testing.T) { Port: "50051", } } - //log.Printf("Adding peer %+v\n", peer) err := hubServer.AddPeerExported()(peer, false, false) if err != nil { log.Println(err) } } - //log.Println("Counting lines...") got := lineCountFile(hubServer.Args.PeerFile) if got != tt.want { t.Errorf("lineCountFile(peers.txt) = %d, want %d", got, tt.want) } + hubServer.Stop() }) } @@ -164,11 +171,13 @@ func TestPeerWriter(t *testing.T) { // TestAddPeerEndpoint tests the ability to add peers func TestAddPeerEndpoint(t *testing.T) { - // ctx := context.Background() ctx := stop.NewDebug() args := server.MakeDefaultTestArgs() + args.DisableStartNotifier = false args2 := server.MakeDefaultTestArgs() + args2.DisableStartNotifier = false args2.Port = 50052 + args2.NotifierPort = 18081 tests := []struct { name string @@ -198,9 +207,8 @@ func TestAddPeerEndpoint(t *testing.T) { metrics.PeersKnown.Set(0) go hubServer.Run() go hubServer2.Run() - //go hubServer.Run() conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port), - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), ) if err != nil { @@ -219,8 +227,6 @@ func TestAddPeerEndpoint(t *testing.T) { log.Println(err) } - hubServer.GrpcServer.GracefulStop() - hubServer2.GrpcServer.GracefulStop() got1 := hubServer.GetNumPeersExported()() got2 := hubServer2.GetNumPeersExported()() if got1 != tt.wantServerOne { @@ -229,6 +235,8 @@ func TestAddPeerEndpoint(t *testing.T) { if got2 != tt.wantServerTwo { t.Errorf("len(hubServer2.PeerServers) = %d, want %d\n", got2, tt.wantServerTwo) } + hubServer.Stop() + hubServer2.Stop() }) } @@ -236,13 +244,17 @@ func TestAddPeerEndpoint(t *testing.T) { // TestAddPeerEndpoint2 tests the ability to add peers func TestAddPeerEndpoint2(t *testing.T) { - // ctx := context.Background() ctx := stop.NewDebug() args := server.MakeDefaultTestArgs() args2 := server.MakeDefaultTestArgs() args3 := server.MakeDefaultTestArgs() args2.Port = 50052 args3.Port = 50053 + args.DisableStartNotifier = false + args2.DisableStartNotifier = false + args3.DisableStartNotifier = false + args2.NotifierPort = 18081 + args3.NotifierPort = 18082 tests := []struct { name string @@ -268,7 +280,7 @@ func TestAddPeerEndpoint2(t *testing.T) { go hubServer2.Run() go hubServer3.Run() conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port), - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), ) if err != nil { @@ -296,9 +308,6 @@ func TestAddPeerEndpoint2(t *testing.T) { log.Println(err) } - hubServer.GrpcServer.GracefulStop() - hubServer2.GrpcServer.GracefulStop() - hubServer3.GrpcServer.GracefulStop() got1 := hubServer.GetNumPeersExported()() got2 := hubServer2.GetNumPeersExported()() got3 := hubServer3.GetNumPeersExported()() @@ -311,6 +320,9 @@ func TestAddPeerEndpoint2(t *testing.T) { if got3 != tt.wantServerThree { t.Errorf("len(hubServer3.PeerServers) = %d, want %d\n", got3, tt.wantServerThree) } + hubServer.Stop() + hubServer2.Stop() + hubServer3.Stop() }) } @@ -318,13 +330,17 @@ func TestAddPeerEndpoint2(t *testing.T) { // TestAddPeerEndpoint3 tests the ability to add peers func TestAddPeerEndpoint3(t *testing.T) { - // ctx := context.Background() ctx := stop.NewDebug() args := server.MakeDefaultTestArgs() args2 := server.MakeDefaultTestArgs() args3 := server.MakeDefaultTestArgs() args2.Port = 50052 args3.Port = 50053 + args.DisableStartNotifier = false + args2.DisableStartNotifier = false + args3.DisableStartNotifier = false + args2.NotifierPort = 18081 + args3.NotifierPort = 18082 tests := []struct { name string @@ -350,14 +366,14 @@ func TestAddPeerEndpoint3(t *testing.T) { go hubServer2.Run() go hubServer3.Run() conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port), - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), ) if err != nil { log.Fatalf("did not connect: %v", err) } conn2, err := grpc.Dial("localhost:50052", - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), ) if err != nil { @@ -386,9 +402,9 @@ func TestAddPeerEndpoint3(t *testing.T) { log.Println(err) } - hubServer.GrpcServer.GracefulStop() - hubServer2.GrpcServer.GracefulStop() - hubServer3.GrpcServer.GracefulStop() + hubServer.Stop() + hubServer2.Stop() + hubServer3.Stop() got1 := hubServer.GetNumPeersExported()() got2 := hubServer2.GetNumPeersExported()() got3 := hubServer3.GetNumPeersExported()() @@ -408,12 +424,11 @@ func TestAddPeerEndpoint3(t *testing.T) { // TestAddPeer tests the ability to add peers func TestUDPServer(t *testing.T) { - // ctx := context.Background() ctx := stop.NewDebug() args := server.MakeDefaultTestArgs() - args.DisableStartUDP = false args2 := server.MakeDefaultTestArgs() args2.Port = 50052 + args.DisableStartUDP = false args2.DisableStartUDP = false tests := []struct { @@ -444,8 +459,8 @@ func TestUDPServer(t *testing.T) { log.Println(err) } - hubServer.GrpcServer.GracefulStop() - hubServer2.GrpcServer.GracefulStop() + hubServer.Stop() + hubServer2.Stop() got1 := hubServer.ExternalIP.String() if got1 != tt.want { diff --git a/server/jsonrpc_blockchain_test.go b/server/jsonrpc_blockchain_test.go index 4616d20..829bdf8 100644 --- a/server/jsonrpc_blockchain_test.go +++ b/server/jsonrpc_blockchain_test.go @@ -185,6 +185,9 @@ func TestHeaders(t *testing.T) { } var resp *BlockHeadersResp err := s.Headers(&req, &resp) + if err != nil { + t.Errorf("Headers: %v", err) + } marshalled, err := json.MarshalIndent(resp, "", " ") if err != nil { t.Errorf("height: %v unmarshal err: %v", height, err) @@ -204,7 +207,7 @@ func TestHeadersSubscribe(t *testing.T) { return } - sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams) + sm := newSessionManager(nil, db, args, grp, &chaincfg.RegressionNetParams) sm.start() defer sm.stop() @@ -385,7 +388,7 @@ func TestAddressSubscribe(t *testing.T) { return } - sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams) + sm := newSessionManager(nil, db, args, grp, &chaincfg.RegressionNetParams) sm.start() defer sm.stop() diff --git a/server/jsonrpc_claimtrie.go b/server/jsonrpc_claimtrie.go index a8ef42b..f70e462 100644 --- a/server/jsonrpc_claimtrie.go +++ b/server/jsonrpc_claimtrie.go @@ -1,13 +1,19 @@ package server import ( + "context" + "fmt" + "github.com/lbryio/herald.go/db" + "github.com/lbryio/herald.go/internal/metrics" pb "github.com/lbryio/herald.go/protobuf/go" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" ) type ClaimtrieService struct { - DB *db.ReadOnlyDBColumnFamily + DB *db.ReadOnlyDBColumnFamily + Server *Server } type ResolveData struct { @@ -18,6 +24,10 @@ type Result struct { Data string `json:"data"` } +type GetClaimByIDData struct { + ClaimID string `json:"claim_id"` +} + // Resolve is the json rpc endpoint for 'blockchain.claimtrie.resolve'. func (t *ClaimtrieService) Resolve(args *ResolveData, result **pb.Outputs) error { log.Println("Resolve") @@ -25,3 +35,74 @@ func (t *ClaimtrieService) Resolve(args *ResolveData, result **pb.Outputs) error *result = res return err } + +// Search is the json rpc endpoint for 'blockchain.claimtrie.search'. +func (t *ClaimtrieService) Search(args *pb.SearchRequest, result **pb.Outputs) error { + log.Println("Search") + if t.Server == nil { + log.Warnln("Server is nil in Search") + *result = nil + return nil + } + ctx := context.Background() + res, err := t.Server.Search(ctx, args) + *result = res + return err +} + +// GetClaimByID is the json rpc endpoint for 'blockchain.claimtrie.getclaimbyid'. +func (t *ClaimtrieService) GetClaimByID(args *GetClaimByIDData, result **pb.Outputs) error { + log.Println("GetClaimByID") + if len(args.ClaimID) != 40 { + *result = nil + return fmt.Errorf("len(claim_id) != 40") + } + + rows, extras, err := t.DB.GetClaimByID(args.ClaimID) + if err != nil { + *result = nil + return err + } + + metrics.RequestsCount.With(prometheus.Labels{"method": "blockchain.claimtrie.getclaimbyid"}).Inc() + + // FIXME: this has txos and extras and so does GetClaimById? + allTxos := make([]*pb.Output, 0) + allExtraTxos := make([]*pb.Output, 0) + + for _, row := range rows { + txos, extraTxos, err := row.ToOutputs() + if err != nil { + *result = nil + return err + } + // TODO: there may be a more efficient way to do this. + allTxos = append(allTxos, txos...) + allExtraTxos = append(allExtraTxos, extraTxos...) + } + + for _, extra := range extras { + txos, extraTxos, err := extra.ToOutputs() + if err != nil { + *result = nil + return err + } + // TODO: there may be a more efficient way to do this. + allTxos = append(allTxos, txos...) + allExtraTxos = append(allExtraTxos, extraTxos...) + } + + res := &pb.Outputs{ + Txos: allTxos, + ExtraTxos: allExtraTxos, + Total: uint32(len(allTxos) + len(allExtraTxos)), + Offset: 0, //TODO + Blocked: nil, //TODO + BlockedTotal: 0, //TODO + } + + log.Warn(res) + + *result = res + return nil +} diff --git a/server/jsonrpc_federation.go b/server/jsonrpc_federation.go new file mode 100644 index 0000000..3e91489 --- /dev/null +++ b/server/jsonrpc_federation.go @@ -0,0 +1,39 @@ +package server + +import ( + "errors" + + log "github.com/sirupsen/logrus" +) + +type PeersService struct { + Server *Server + // needed for subscribe/unsubscribe + sessionMgr *sessionManager + session *session +} + +type PeersSubscribeReq struct { + Ip string `json:"ip"` + Host string `json:"host"` + Details []string `json:"details"` +} + +type PeersSubscribeResp string + +// Features is the json rpc endpoint for 'server.peers.subcribe'. +func (t *PeersService) Subscribe(req *PeersSubscribeReq, res **PeersSubscribeResp) error { + log.Println("PeersSubscribe") + // var port = "50001" + + // FIXME: Get the actual port from the request details + + if t.sessionMgr == nil || t.session == nil { + *res = nil + return errors.New("no session, rpc not supported") + } + t.sessionMgr.peersSubscribe(t.session, true /*subscribe*/) + + *res = nil + return nil +} diff --git a/server/jsonrpc_service.go b/server/jsonrpc_service.go index b846c00..b35036f 100644 --- a/server/jsonrpc_service.go +++ b/server/jsonrpc_service.go @@ -91,7 +91,7 @@ fail1: s1.RegisterCodec(&gorillaRpcCodec{gorilla_json.NewCodec()}, "application/json") // Register "blockchain.claimtrie.*"" handlers. - claimtrieSvc := &ClaimtrieService{s.DB} + claimtrieSvc := &ClaimtrieService{s.DB, s} err := s1.RegisterTCPService(claimtrieSvc, "blockchain_claimtrie") if err != nil { log.Errorf("RegisterTCPService: %v\n", err) @@ -134,6 +134,14 @@ fail1: goto fail2 } + // Register "server.peers" handlers. + peersSvc := &PeersService{Server: s} + err = s1.RegisterTCPService(peersSvc, "server_peers") + if err != nil { + log.Errorf("RegisterTCPService: %v\n", err) + goto fail2 + } + r := gorilla_mux.NewRouter() r.Handle("/rpc", s1) port := ":" + strconv.FormatUint(uint64(s.Args.JSONRPCHTTPPort), 10) diff --git a/server/notifier.go b/server/notifier.go index 1f4a847..a3a16a2 100644 --- a/server/notifier.go +++ b/server/notifier.go @@ -2,6 +2,7 @@ package server import ( "encoding/binary" + "fmt" "net" "github.com/lbryio/herald.go/internal" @@ -53,10 +54,16 @@ func (s *Server) DoNotify(heightHash *internal.HeightHash) error { // RunNotifier Runs the notfying action forever func (s *Server) RunNotifier() error { for notification := range s.NotifierChan { - switch notification.(type) { + switch note := notification.(type) { case internal.HeightHash: - heightHash, _ := notification.(internal.HeightHash) + heightHash := note s.DoNotify(&heightHash) + // Do we need this? + // case peerNotification: + // peer, _ := notification.(peerNotification) + // s.notifyPeerSubs(&Peer{Address: peer.address, Port: peer.port}) + default: + logrus.Warn("unknown notification type") } s.sessionManager.doNotify(notification) } @@ -65,7 +72,8 @@ func (s *Server) RunNotifier() error { // NotifierServer implements the TCP protocol for height/blockheader notifications func (s *Server) NotifierServer() error { - address := ":" + s.Args.NotifierPort + s.Grp.Add(1) + address := ":" + fmt.Sprintf("%d", s.Args.NotifierPort) addr, err := net.ResolveTCPAddr("tcp", address) if err != nil { return err @@ -77,11 +85,27 @@ func (s *Server) NotifierServer() error { } defer listen.Close() + rdyCh := make(chan bool) for { + var conn net.Conn + var err error logrus.Info("Waiting for connection") - conn, err := listen.Accept() + + go func() { + conn, err = listen.Accept() + rdyCh <- true + }() + + select { + case <-s.Grp.Ch(): + s.Grp.Done() + return nil + case <-rdyCh: + logrus.Info("Connection accepted") + } + if err != nil { logrus.Warn(err) continue diff --git a/server/notifier_test.go b/server/notifier_test.go index d482b60..0bcaa45 100644 --- a/server/notifier_test.go +++ b/server/notifier_test.go @@ -15,6 +15,26 @@ import ( const defaultBufferSize = 1024 +func subReady(s *server.Server) error { + sleepTime := time.Millisecond * 100 + for { + if sleepTime > time.Second { + return fmt.Errorf("timeout") + } + s.HeightSubsMut.RLock() + if len(s.HeightSubs) > 0 { + s.HeightSubsMut.RUnlock() + return nil + } + s.HeightSubsMut.RUnlock() + + logrus.Warn("waiting for subscriber") + time.Sleep(sleepTime) + sleepTime = sleepTime * 2 + + } +} + func tcpConnReady(addr string) (net.Conn, error) { sleepTime := time.Millisecond * 100 for { @@ -48,14 +68,13 @@ func tcpRead(conn net.Conn) ([]byte, error) { func TestNotifierServer(t *testing.T) { args := server.MakeDefaultTestArgs() - // ctx := context.Background() ctx := stop.NewDebug() hub := server.MakeHubServer(ctx, args) go hub.NotifierServer() go hub.RunNotifier() - addr := fmt.Sprintf(":%s", args.NotifierPort) + addr := fmt.Sprintf(":%d", args.NotifierPort) logrus.Info(addr) conn, err := tcpConnReady(addr) if err != nil { @@ -77,7 +96,10 @@ func TestNotifierServer(t *testing.T) { // Hacky but needed because if the reader isn't ready // before the writer sends it won't get the data - time.Sleep(time.Second) + err = subReady(hub) + if err != nil { + t.Fatal(err) + } hash, _ := hex.DecodeString("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") logrus.Warn("sending hash") diff --git a/server/server.go b/server/server.go index d128089..c88f090 100644 --- a/server/server.go +++ b/server/server.go @@ -7,8 +7,7 @@ import ( "errors" "fmt" "hash" - "io/ioutil" - "log" + golog "log" "net" "net/http" "os" @@ -27,7 +26,7 @@ import ( "github.com/olivere/elastic/v7" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - logrus "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/reflection" ) @@ -56,6 +55,7 @@ type Server struct { HeightSubsMut sync.RWMutex NotifierChan chan interface{} Grp *stop.Group + notiferListener *net.TCPListener sessionManager *sessionManager pb.UnimplementedHubServer } @@ -153,29 +153,50 @@ func (s *Server) Run() { } } +func (s *Server) Stop() { + log.Println("Shutting down server...") + + if s.EsClient != nil { + log.Println("Stopping es client...") + s.EsClient.Stop() + } + if s.GrpcServer != nil { + log.Println("Stopping grpc server...") + s.GrpcServer.GracefulStop() + } + log.Println("Stopping other server threads...") + s.Grp.StopAndWait() + if s.DB != nil { + log.Println("Stopping database connection...") + s.DB.Shutdown() + } + + log.Println("Returning from Stop...") +} + func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, error) { - tmpName, err := ioutil.TempDir("", "go-lbry-hub") + tmpName, err := os.MkdirTemp("", "go-lbry-hub") if err != nil { - logrus.Info(err) + log.Info(err) log.Fatal(err) } - logrus.Info("tmpName", tmpName) + log.Info("tmpName", tmpName) if err != nil { - logrus.Info(err) + log.Info(err) } myDB, err := db.GetProdDB(args.DBPath, tmpName, grp) if err != nil { // Can't load the db, fail loudly - logrus.Info(err) + log.Info(err) log.Fatalln(err) } if myDB.LastState != nil { - logrus.Infof("DB version: %v", myDB.LastState.DBVersion) - logrus.Infof("height: %v", myDB.LastState.Height) - logrus.Infof("genesis: %v", myDB.LastState.Genesis.String()) - logrus.Infof("tip: %v", myDB.LastState.Tip.String()) - logrus.Infof("tx count: %v", myDB.LastState.TxCount) + log.Infof("DB version: %v", myDB.LastState.DBVersion) + log.Infof("height: %v", myDB.LastState.Height) + log.Infof("genesis: %v", myDB.LastState.Genesis.String()) + log.Infof("tip: %v", myDB.LastState.Tip.String()) + log.Infof("tx count: %v", myDB.LastState.TxCount) } blockingChannelHashes := make([][]byte, 0, 10) @@ -186,7 +207,7 @@ func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, erro for _, id := range args.BlockingChannelIds { hash, err := hex.DecodeString(id) if err != nil { - logrus.Warn("Invalid channel id: ", id) + log.Warn("Invalid channel id: ", id) continue } blockingChannelHashes = append(blockingChannelHashes, hash) @@ -196,7 +217,7 @@ func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, erro for _, id := range args.FilteringChannelIds { hash, err := hex.DecodeString(id) if err != nil { - logrus.Warn("Invalid channel id: ", id) + log.Warn("Invalid channel id: ", id) continue } filteringChannelHashes = append(filteringChannelHashes, hash) @@ -207,10 +228,10 @@ func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, erro myDB.FilteringChannelHashes = filteringChannelHashes if len(filteringIds) > 0 { - logrus.Infof("filtering claims reposted by channels: %+s", filteringIds) + log.Infof("filtering claims reposted by channels: %+s", filteringIds) } if len(blockingIds) > 0 { - logrus.Infof("blocking claims reposted by channels: %+s", blockingIds) + log.Infof("blocking claims reposted by channels: %+s", blockingIds) } return myDB, nil @@ -234,7 +255,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server { var client *elastic.Client = nil if !args.DisableEs { - esUrl := args.EsHost + ":" + args.EsPort + esUrl := args.EsHost + ":" + fmt.Sprintf("%d", args.EsPort) opts := []elastic.ClientOptionFunc{ elastic.SetSniff(true), elastic.SetSnifferTimeoutStartup(time.Second * 60), @@ -242,7 +263,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server { elastic.SetURL(esUrl), } if args.Debug { - opts = append(opts, elastic.SetTraceLog(log.New(os.Stderr, "[[ELASTIC]]", 0))) + opts = append(opts, elastic.SetTraceLog(golog.New(os.Stderr, "[[ELASTIC]]", 0))) } client, err = elastic.NewClient(opts...) if err != nil { @@ -271,7 +292,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server { if !args.DisableResolve { myDB, err = LoadDatabase(args, grp) if err != nil { - logrus.Warning(err) + log.Warning(err) } } @@ -302,7 +323,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server { chain := chaincfg.MainNetParams if dbChain != nil && cliChain != nil { if dbChain != cliChain { - logrus.Warnf("network: %v (from db) conflicts with %v (from cli)", dbChain.Name, cliChain.Name) + log.Warnf("network: %v (from db) conflicts with %v (from cli)", dbChain.Name, cliChain.Name) } chain = *dbChain } else if dbChain != nil { @@ -310,7 +331,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server { } else if cliChain != nil { chain = *cliChain } - logrus.Infof("network: %v", chain.Name) + log.Infof("network: %v", chain.Name) args.GenesisHash = chain.GenesisHash.String() @@ -338,34 +359,36 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server { ExternalIP: net.IPv4(127, 0, 0, 1), HeightSubs: make(map[net.Addr]net.Conn), HeightSubsMut: sync.RWMutex{}, - NotifierChan: make(chan interface{}), + NotifierChan: make(chan interface{}, 1), Grp: grp, - sessionManager: newSessionManager(myDB, args, sessionGrp, &chain), + sessionManager: nil, } + // FIXME: HACK + s.sessionManager = newSessionManager(s, myDB, args, sessionGrp, &chain) // Start up our background services if !args.DisableResolve && !args.DisableRocksDBRefresh { - logrus.Info("Running detect changes") + log.Info("Running detect changes") myDB.RunDetectChanges(s.NotifierChan) } if !args.DisableBlockingAndFiltering { myDB.RunGetBlocksAndFilters() } if !args.DisableStartPrometheus { - go s.prometheusEndpoint(s.Args.PrometheusPort, "metrics") + go s.prometheusEndpoint(fmt.Sprintf("%d", s.Args.PrometheusPort), "metrics") } if !args.DisableStartUDP { go func() { err := s.UDPServer(s.Args.Port) if err != nil { - logrus.Errorf("UDP Server (%d) failed! %v", s.Args.Port, err) + log.Errorf("UDP Server (%d) failed! %v", s.Args.Port, err) } }() if s.Args.JSONRPCPort != 0 { go func() { err := s.UDPServer(s.Args.JSONRPCPort) if err != nil { - logrus.Errorf("UDP Server (%d) failed! %v", s.Args.JSONRPCPort, err) + log.Errorf("UDP Server (%d) failed! %v", s.Args.JSONRPCPort, err) } }() } @@ -409,7 +432,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server { // for this hub to allow for metric tracking. func (s *Server) prometheusEndpoint(port string, endpoint string) { http.Handle("/"+endpoint, promhttp.Handler()) - log.Println(fmt.Sprintf("listening on :%s /%s", port, endpoint)) + log.Printf("listening on :%s /%s\n", port, endpoint) err := http.ListenAndServe(":"+port, nil) log.Fatalln("Shouldn't happen??!?!", err) } @@ -567,7 +590,7 @@ func InternalResolve(urls []string, DB *db.ReadOnlyDBColumnFamily) (*pb.Outputs, BlockedTotal: 0, //TODO } - logrus.Warn(res) + log.Warn(res) return res, nil } diff --git a/server/server_test_pkg.go b/server/server_test_pkg.go index 31230c7..3d64879 100644 --- a/server/server_test_pkg.go +++ b/server/server_test_pkg.go @@ -14,6 +14,6 @@ func (s *Server) GetNumPeersExported() func() int64 { return s.getNumPeers } -func NewSessionManagerExported(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager { - return newSessionManager(db, args, grp, chain) +func NewSessionManagerExported(server *Server, db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager { + return newSessionManager(server, db, args, grp, chain) } diff --git a/server/session.go b/server/session.go index 0b9136b..6413d06 100644 --- a/server/session.go +++ b/server/session.go @@ -33,6 +33,11 @@ type hashXNotification struct { statusStr string } +type peerNotification struct { + address string + port string +} + type session struct { id uintptr addr net.Addr @@ -41,6 +46,8 @@ type session struct { hashXSubs map[[HASHX_LEN]byte]string // headersSub indicates header subscription headersSub bool + // peersSub indicates peer subscription + peersSub bool // headersSubRaw indicates the header subscription mode headersSubRaw bool // client provides the ability to send notifications @@ -55,12 +62,11 @@ type session struct { func (s *session) doNotify(notification interface{}) { var method string var params interface{} - switch notification.(type) { + switch note := notification.(type) { case headerNotification: if !s.headersSub { return } - note, _ := notification.(headerNotification) heightHash := note.HeightHash method = "blockchain.headers.subscribe" if s.headersSubRaw { @@ -80,7 +86,6 @@ func (s *session) doNotify(notification interface{}) { params = header } case hashXNotification: - note, _ := notification.(hashXNotification) orig, ok := s.hashXSubs[note.hashX] if !ok { return @@ -95,6 +100,13 @@ func (s *session) doNotify(notification interface{}) { status = hex.EncodeToString(note.status) } params = []string{orig, status} + case peerNotification: + if !s.peersSub { + return + } + method = "server.peers.subscribe" + params = []string{note.address, note.port} + default: log.Warnf("unknown notification type: %v", notification) return @@ -126,14 +138,17 @@ type sessionManager struct { manageTicker *time.Ticker db *db.ReadOnlyDBColumnFamily args *Args + server *Server chain *chaincfg.Params + // peerSubs are sessions subscribed via 'blockchain.peers.subscribe' + peerSubs sessionMap // headerSubs are sessions subscribed via 'blockchain.headers.subscribe' headerSubs sessionMap // hashXSubs are sessions subscribed via 'blockchain.{address,scripthash}.subscribe' hashXSubs map[[HASHX_LEN]byte]sessionMap } -func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager { +func newSessionManager(server *Server, db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager { return &sessionManager{ sessions: make(sessionMap), grp: grp, @@ -142,7 +157,9 @@ func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Grou manageTicker: time.NewTicker(time.Duration(max(5, args.SessionTimeout/20)) * time.Second), db: db, args: args, + server: server, chain: chain, + peerSubs: make(sessionMap), headerSubs: make(sessionMap), hashXSubs: make(map[[HASHX_LEN]byte]sessionMap), } @@ -211,8 +228,15 @@ func (sm *sessionManager) addSession(conn net.Conn) *session { log.Errorf("RegisterName: %v\n", err) } + // Register "server.peers" handlers. + peersSvc := &PeersService{Server: sm.server} + err = s1.RegisterName("server.peers", peersSvc) + if err != nil { + log.Errorf("RegisterName: %v\n", err) + } + // Register "blockchain.claimtrie.*"" handlers. - claimtrieSvc := &ClaimtrieService{sm.db} + claimtrieSvc := &ClaimtrieService{sm.db, sm.server} err = s1.RegisterName("blockchain.claimtrie", claimtrieSvc) if err != nil { log.Errorf("RegisterName: %v\n", err) @@ -286,6 +310,18 @@ func (sm *sessionManager) broadcastTx(rawTx []byte) (*chainhash.Hash, error) { return nil, nil } +func (sm *sessionManager) peersSubscribe(sess *session, subscribe bool) { + sm.sessionsMut.Lock() + defer sm.sessionsMut.Unlock() + if subscribe { + sm.peerSubs[sess.id] = sess + sess.peersSub = true + return + } + delete(sm.peerSubs, sess.id) + sess.peersSub = false +} + func (sm *sessionManager) headersSubscribe(sess *session, raw bool, subscribe bool) { sm.sessionsMut.Lock() defer sm.sessionsMut.Unlock() @@ -325,16 +361,15 @@ func (sm *sessionManager) hashXSubscribe(sess *session, hashX []byte, original s } func (sm *sessionManager) doNotify(notification interface{}) { - switch notification.(type) { + switch note := notification.(type) { case internal.HeightHash: // The HeightHash notification translates to headerNotification. - notification = &headerNotification{HeightHash: notification.(internal.HeightHash)} + notification = &headerNotification{HeightHash: note} } sm.sessionsMut.RLock() var subsCopy sessionMap - switch notification.(type) { + switch note := notification.(type) { case headerNotification: - note, _ := notification.(headerNotification) subsCopy = sm.headerSubs if len(subsCopy) > 0 { hdr := [HEADER_SIZE]byte{} @@ -343,7 +378,6 @@ func (sm *sessionManager) doNotify(notification interface{}) { note.blockHeaderStr = hex.EncodeToString(note.BlockHeader[:]) } case hashXNotification: - note, _ := notification.(hashXNotification) hashXSubs, ok := sm.hashXSubs[note.hashX] if ok { subsCopy = hashXSubs @@ -351,6 +385,8 @@ func (sm *sessionManager) doNotify(notification interface{}) { if len(subsCopy) > 0 { note.statusStr = hex.EncodeToString(note.status) } + case peerNotification: + subsCopy = sm.peerSubs default: log.Warnf("unknown notification type: %v", notification) } @@ -369,8 +405,10 @@ type sessionServerCodec struct { // ReadRequestHeader provides ability to rewrite the incoming // request "method" field. For example: -// blockchain.block.get_header -> blockchain.block.Get_header -// blockchain.address.listunspent -> blockchain.address.Listunspent +// +// blockchain.block.get_header -> blockchain.block.Get_header +// blockchain.address.listunspent -> blockchain.address.Listunspent +// // This makes the "method" string compatible with rpc.Server // requirements. func (c *sessionServerCodec) ReadRequestHeader(req *rpc.Request) error { diff --git a/signal.go b/signal.go index 1a9b7e2..e35cc3f 100644 --- a/signal.go +++ b/signal.go @@ -49,6 +49,9 @@ func interruptListener() <-chan struct{} { case sig := <-interruptChannel: log.Infof("Received signal (%s). Already "+ "shutting down...", sig) + case <-shutdownRequestChannel: + log.Info("Shutdown requested. Already " + + "shutting down...") } } }() diff --git a/signalsigterm.go b/signalsigterm.go index c9435fc..dbd9750 100644 --- a/signalsigterm.go +++ b/signalsigterm.go @@ -10,12 +10,9 @@ package main import ( "os" "syscall" - - "github.com/lbryio/lbry.go/v3/extras/stop" ) // initsignals sets the signals to be caught by the signal handler -func initsignals(stopCh stop.Chan) { - shutdownRequestChannel = stopCh +func initsignals() { interruptSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} }