diff --git a/db/db.go b/db/db.go index 164a9d0..00cc9a7 100644 --- a/db/db.go +++ b/db/db.go @@ -650,7 +650,7 @@ func (db *ReadOnlyDBColumnFamily) Shutdown() { // RunDetectChanges Go routine the runs continuously while the hub is active // to keep the db readonly view up to date and handle reorgs on the // blockchain. -func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan *internal.HeightHash) { +func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) { go func() { lastPrint := time.Now() for { @@ -674,7 +674,7 @@ func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan *internal.Height } // DetectChanges keep the rocksdb db in sync and handle reorgs -func (db *ReadOnlyDBColumnFamily) detectChanges(notifCh chan *internal.HeightHash) error { +func (db *ReadOnlyDBColumnFamily) detectChanges(notifCh chan<- interface{}) error { err := db.DB.TryCatchUpWithPrimary() if err != nil { return err diff --git a/server/args.go b/server/args.go index 29a64d3..cfed4d8 100644 --- a/server/args.go +++ b/server/args.go @@ -3,6 +3,7 @@ package server import ( "log" "os" + "strconv" "strings" "github.com/akamensky/argparse" @@ -27,7 +28,8 @@ type Args struct { EsPort string PrometheusPort string NotifierPort string - JSONRPCPort string + JSONRPCPort *int + JSONRPCHTTPPort *int EsIndex string RefreshDelta int CacheTTL int @@ -58,7 +60,7 @@ const ( DefaultEsPort = "9200" DefaultPrometheusPort = "2112" DefaultNotifierPort = "18080" - DefaultJSONRPCPort = "50001" + DefaultJSONRPCPort = 50001 DefaultRefreshDelta = 5 DefaultCacheTTL = 5 DefaultPeerFile = "peers.txt" @@ -111,6 +113,11 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { searchCmd := parser.NewCommand("search", "claim search") dbCmd := parser.NewCommand("db", "db testing") + validatePort := func(arg []string) error { + _, err := strconv.ParseUint(arg[0], 10, 16) + return err + } + host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost}) port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort}) dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath}) @@ -120,7 +127,8 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort}) prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort}) notifierPort := parser.String("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort}) - jsonRPCPort := parser.String("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Default: DefaultJSONRPCPort}) + jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort, Default: DefaultJSONRPCPort}) + jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort}) esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: DefaultEsIndex}) refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: DefaultRefreshDelta}) cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: DefaultCacheTTL}) @@ -168,7 +176,8 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { EsPort: *esPort, PrometheusPort: *prometheusPort, NotifierPort: *notifierPort, - JSONRPCPort: *jsonRPCPort, + JSONRPCPort: jsonRPCPort, + JSONRPCHTTPPort: jsonRPCHTTPPort, EsIndex: *esIndex, RefreshDelta: *refreshDelta, CacheTTL: *cacheTTL, diff --git a/server/federation_test.go b/server/federation_test.go index 850500f..78d9038 100644 --- a/server/federation_test.go +++ b/server/federation_test.go @@ -46,6 +46,7 @@ func removeFile(fileName string) { // makeDefaultArgs creates a default set of arguments for testing the server. func makeDefaultArgs() *server.Args { + port := server.DefaultJSONRPCPort args := &server.Args{ CmdType: server.ServeCmd, Host: server.DefaultHost, @@ -55,7 +56,7 @@ func makeDefaultArgs() *server.Args { EsPort: server.DefaultEsPort, PrometheusPort: server.DefaultPrometheusPort, NotifierPort: server.DefaultNotifierPort, - JSONRPCPort: server.DefaultJSONRPCPort, + JSONRPCPort: &port, EsIndex: server.DefaultEsIndex, RefreshDelta: server.DefaultRefreshDelta, CacheTTL: server.DefaultCacheTTL, diff --git a/server/jsonrpc_service.go b/server/jsonrpc_service.go index 9d42b9d..90f20ca 100644 --- a/server/jsonrpc_service.go +++ b/server/jsonrpc_service.go @@ -2,7 +2,9 @@ package server import ( "fmt" + "net" "net/http" + "strconv" "strings" gorilla_mux "github.com/gorilla/mux" @@ -49,36 +51,71 @@ func (cr *gorillaRpcCodecRequest) Method() (string, error) { // StartJsonRPC starts the json rpc server and registers the endpoints. func (s *Server) StartJsonRPC() error { - port := ":" + s.Args.JSONRPCPort + s.sessionManager.start() + defer s.sessionManager.stop() - s1 := gorilla_rpc.NewServer() // Create a new RPC server - // Register the type of data requested as JSON, with custom codec. - s1.RegisterCodec(&gorillaRpcCodec{gorilla_json.NewCodec()}, "application/json") - - // Register "blockchain.claimtrie.*"" handlers. - claimtrieSvc := &ClaimtrieService{s.DB} - err := s1.RegisterTCPService(claimtrieSvc, "blockchain_claimtrie") - if err != nil { - log.Errorf("RegisterService: %v\n", err) + // Set up the pure JSONRPC server with persistent connections/sessions. + for s.Args.JSONRPCPort != nil { + port := ":" + strconv.FormatUint(uint64(*s.Args.JSONRPCPort), 10) + laddr, err := net.ResolveTCPAddr("tcp", port) + if err != nil { + log.Errorf("ResoveIPAddr: %v\n", err) + break + } + listener, err := net.ListenTCP("tcp", laddr) + if err != nil { + log.Errorf("ListenTCP: %v\n", err) + break + } + acceptConnections := func(listener net.Listener) { + for { + conn, err := listener.Accept() + if err != nil { + log.Errorf("Accept: %v\n", err) + break + } + log.Infof("Accepted: %v", conn.RemoteAddr()) + s.sessionManager.addSession(conn) + } + } + go acceptConnections(listener) + break } - // Register other "blockchain.{block,address,scripthash}.*" handlers. - err = s1.RegisterTCPService(&BlockchainBlockService{s.DB, s.Chain}, "blockchain_block") - if err != nil { - log.Errorf("RegisterService: %v\n", err) - } - err = s1.RegisterTCPService(&BlockchainAddressService{s.DB, s.Chain, nil, nil}, "blockchain_address") - if err != nil { - log.Errorf("RegisterService: %v\n", err) - } - err = s1.RegisterTCPService(&BlockchainScripthashService{s.DB, s.Chain, nil, nil}, "blockchain_scripthash") - if err != nil { - log.Errorf("RegisterService: %v\n", err) - } + // Set up the JSONRPC over HTTP server. + for s.Args.JSONRPCHTTPPort != nil { + s1 := gorilla_rpc.NewServer() // Create a new RPC server + // Register the type of data requested as JSON, with custom codec. + s1.RegisterCodec(&gorillaRpcCodec{gorilla_json.NewCodec()}, "application/json") - r := gorilla_mux.NewRouter() - r.Handle("/rpc", s1) - log.Fatal(http.ListenAndServe(port, r)) + // Register "blockchain.claimtrie.*"" handlers. + claimtrieSvc := &ClaimtrieService{s.DB} + err := s1.RegisterTCPService(claimtrieSvc, "blockchain_claimtrie") + if err != nil { + log.Errorf("RegisterService: %v\n", err) + } + + // Register other "blockchain.{block,address,scripthash}.*" handlers. + blockchainSvc := &BlockchainBlockService{s.DB, s.Chain} + err = s1.RegisterTCPService(blockchainSvc, "blockchain_block") + if err != nil { + log.Errorf("RegisterService: %v\n", err) + } + err = s1.RegisterTCPService(&BlockchainAddressService{s.DB, s.Chain, nil, nil}, "blockchain_address") + if err != nil { + log.Errorf("RegisterService: %v\n", err) + } + err = s1.RegisterTCPService(&BlockchainScripthashService{s.DB, s.Chain, nil, nil}, "blockchain_scripthash") + if err != nil { + log.Errorf("RegisterService: %v\n", err) + } + + r := gorilla_mux.NewRouter() + r.Handle("/rpc", s1) + port := ":" + strconv.FormatUint(uint64(*s.Args.JSONRPCHTTPPort), 10) + log.Fatal(http.ListenAndServe(port, r)) + break + } return nil } diff --git a/server/notifier.go b/server/notifier.go index 8d8a367..1f4a847 100644 --- a/server/notifier.go +++ b/server/notifier.go @@ -52,8 +52,13 @@ func (s *Server) DoNotify(heightHash *internal.HeightHash) error { // RunNotifier Runs the notfying action forever func (s *Server) RunNotifier() error { - for heightHash := range s.NotifierChan { - s.DoNotify(heightHash) + for notification := range s.NotifierChan { + switch notification.(type) { + case internal.HeightHash: + heightHash, _ := notification.(internal.HeightHash) + s.DoNotify(&heightHash) + } + s.sessionManager.doNotify(notification) } return nil } diff --git a/server/notifier_test.go b/server/notifier_test.go index e642e62..3e820cc 100644 --- a/server/notifier_test.go +++ b/server/notifier_test.go @@ -80,7 +80,7 @@ func TestNotifierServer(t *testing.T) { hash, _ := hex.DecodeString("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") logrus.Warn("sending hash") - hub.NotifierChan <- &internal.HeightHash{Height: 1, BlockHash: hash} + hub.NotifierChan <- internal.HeightHash{Height: 1, BlockHash: hash} res := <-resCh logrus.Info(string(res)) diff --git a/server/server.go b/server/server.go index 821d5c6..0fa8057 100644 --- a/server/server.go +++ b/server/server.go @@ -18,7 +18,6 @@ import ( "github.com/ReneKroon/ttlcache/v2" "github.com/lbryio/herald.go/db" - "github.com/lbryio/herald.go/internal" "github.com/lbryio/herald.go/internal/metrics" "github.com/lbryio/herald.go/meta" pb "github.com/lbryio/herald.go/protobuf/go" @@ -53,7 +52,8 @@ type Server struct { ExternalIP net.IP HeightSubs map[net.Addr]net.Conn HeightSubsMut sync.RWMutex - NotifierChan chan *internal.HeightHash + NotifierChan chan interface{} + sessionManager *sessionManager pb.UnimplementedHubServer } @@ -332,7 +332,8 @@ func MakeHubServer(ctx context.Context, args *Args) *Server { ExternalIP: net.IPv4(127, 0, 0, 1), HeightSubs: make(map[net.Addr]net.Conn), HeightSubsMut: sync.RWMutex{}, - NotifierChan: make(chan *internal.HeightHash), + NotifierChan: make(chan interface{}), + sessionManager: newSessionManager(myDB, &chain), } // Start up our background services