WIP: Json rpc federation, search/getclaimbyid, and shutdown #76

Merged
jeffreypicard merged 11 commits from json-rpc-federation-and-shutdown into master 2022-12-07 17:01:36 +01:00
5 changed files with 27 additions and 20 deletions
Showing only changes of commit 0a9ee690b5 - Show all commits

View file

@ -26,9 +26,9 @@ type Args struct {
DBPath string DBPath string
Chain *string Chain *string
EsHost string EsHost string
EsPort string EsPort int
PrometheusPort string PrometheusPort int
NotifierPort string NotifierPort int
JSONRPCPort int JSONRPCPort int
JSONRPCHTTPPort int JSONRPCHTTPPort int
MaxSessions int MaxSessions int
@ -71,9 +71,9 @@ const (
DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME
DefaultEsHost = "http://localhost" DefaultEsHost = "http://localhost"
DefaultEsIndex = "claims" DefaultEsIndex = "claims"
DefaultEsPort = "9200" DefaultEsPort = 9200
DefaultPrometheusPort = "2112" DefaultPrometheusPort = 2112
DefaultNotifierPort = "18080" DefaultNotifierPort = 18080
DefaultJSONRPCPort = 50001 DefaultJSONRPCPort = 50001
DefaultJSONRPCHTTPPort = 50002 DefaultJSONRPCHTTPPort = 50002
DefaultMaxSessions = 10000 DefaultMaxSessions = 10000
@ -219,9 +219,9 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
chain := parser.Selector("", "chain", []string{chaincfg.MainNetParams.Name, chaincfg.TestNet3Params.Name, chaincfg.RegressionNetParams.Name, "testnet"}, chain := parser.Selector("", "chain", []string{chaincfg.MainNetParams.Name, chaincfg.TestNet3Params.Name, chaincfg.RegressionNetParams.Name, "testnet"},
&argparse.Options{Required: false, Help: "Which chain to use, default is 'mainnet'. Values 'regtest' and 'testnet' are for testing", Default: chaincfg.MainNetParams.Name}) &argparse.Options{Required: false, Help: "Which chain to use, default is 'mainnet'. Values 'regtest' and 'testnet' are for testing", Default: chaincfg.MainNetParams.Name})
esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: DefaultEsHost}) esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: DefaultEsHost})
esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort}) esPort := parser.Int("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort})
prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort}) prometheusPort := parser.Int("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort})
notifierPort := parser.String("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort}) notifierPort := parser.Int("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort})
jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort, Default: DefaultJSONRPCPort}) jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort, Default: DefaultJSONRPCPort})
jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort, Default: DefaultJSONRPCHTTPPort}) jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort, Default: DefaultJSONRPCHTTPPort})
maxSessions := parser.Int("", "max-sessions", &argparse.Options{Required: false, Help: "Maximum number of electrum clients that can be connected", Default: DefaultMaxSessions}) maxSessions := parser.Int("", "max-sessions", &argparse.Options{Required: false, Help: "Maximum number of electrum clients that can be connected", Default: DefaultMaxSessions})
@ -334,11 +334,17 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
} }
if esPort, ok := environment["ELASTIC_PORT"]; ok { if esPort, ok := environment["ELASTIC_PORT"]; ok {
args.EsPort = esPort args.EsPort, err = strconv.Atoi(esPort)
if err != nil {
log.Fatal(err)
}
} }
if prometheusPort, ok := environment["GOHUB_PROMETHEUS_PORT"]; ok { if prometheusPort, ok := environment["GOHUB_PROMETHEUS_PORT"]; ok {
args.PrometheusPort = prometheusPort args.PrometheusPort, err = strconv.Atoi(prometheusPort)
if err != nil {
log.Fatal(err)
}
} }
/* /*

View file

@ -175,7 +175,7 @@ func TestAddPeerEndpoint(t *testing.T) {
args2 := server.MakeDefaultTestArgs() args2 := server.MakeDefaultTestArgs()
args2.DisableStartNotifier = false args2.DisableStartNotifier = false
args2.Port = 50052 args2.Port = 50052
args2.NotifierPort = "18081" args2.NotifierPort = 18081
tests := []struct { tests := []struct {
name string name string
@ -255,8 +255,8 @@ func TestAddPeerEndpoint2(t *testing.T) {
args.DisableStartNotifier = false args.DisableStartNotifier = false
args2.DisableStartNotifier = false args2.DisableStartNotifier = false
args3.DisableStartNotifier = false args3.DisableStartNotifier = false
args2.NotifierPort = "18081" args2.NotifierPort = 18081
args3.NotifierPort = "18082" args3.NotifierPort = 18082
tests := []struct { tests := []struct {
name string name string
@ -345,8 +345,8 @@ func TestAddPeerEndpoint3(t *testing.T) {
args.DisableStartNotifier = false args.DisableStartNotifier = false
args2.DisableStartNotifier = false args2.DisableStartNotifier = false
args3.DisableStartNotifier = false args3.DisableStartNotifier = false
args2.NotifierPort = "18081" args2.NotifierPort = 18081
args3.NotifierPort = "18082" args3.NotifierPort = 18082
tests := []struct { tests := []struct {
name string name string

View file

@ -2,6 +2,7 @@ package server
import ( import (
"encoding/binary" "encoding/binary"
"fmt"
"net" "net"
"github.com/lbryio/herald.go/internal" "github.com/lbryio/herald.go/internal"
@ -72,7 +73,7 @@ func (s *Server) RunNotifier() error {
// NotifierServer implements the TCP protocol for height/blockheader notifications // NotifierServer implements the TCP protocol for height/blockheader notifications
func (s *Server) NotifierServer() error { func (s *Server) NotifierServer() error {
s.Grp.Add(1) s.Grp.Add(1)
address := ":" + s.Args.NotifierPort address := ":" + fmt.Sprintf("%d", s.Args.NotifierPort)
addr, err := net.ResolveTCPAddr("tcp", address) addr, err := net.ResolveTCPAddr("tcp", address)
if err != nil { if err != nil {
return err return err
moodyjon commented 2022-11-17 19:37:07 +01:00 (Migrated from github.com)
Review

Probably need peerNotification case here.

Probably need `peerNotification` case here.
jeffreypicard commented 2022-11-17 23:04:39 +01:00 (Migrated from github.com)
Review

Maybe. We need to refactor this whole section of code, but I think that ends up getting handled by the catch-all s.sessionManager.doNotify

Maybe. We need to refactor this whole section of code, but I think that ends up getting handled by the catch-all `s.sessionManager.doNotify`
moodyjon commented 2022-11-19 01:02:16 +01:00 (Migrated from github.com)
Review

Never mind... I have been reading the code more, and NotifierChan is only populated with HeightHash notifications coming from observing the DB.

Never mind... I have been reading the code more, and NotifierChan is only populated with HeightHash notifications coming from observing the DB.
jeffreypicard commented 2022-11-19 08:27:35 +01:00 (Migrated from github.com)
Review

I send it the new peers in federation.go addPeer. So I'm pretty sure this does get notified of new peers.

I send it the new peers in federation.go `addPeer`. So I'm pretty sure this does get notified of new peers.

View file

@ -77,7 +77,7 @@ func TestNotifierServer(t *testing.T) {
// time.Sleep(time.Second * 2) // time.Sleep(time.Second * 2)
addr := fmt.Sprintf(":%s", args.NotifierPort) addr := fmt.Sprintf(":%d", args.NotifierPort)
logrus.Info(addr) logrus.Info(addr)
conn, err := tcpConnReady(addr) conn, err := tcpConnReady(addr)
if err != nil { if err != nil {

View file

@ -256,7 +256,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
var client *elastic.Client = nil var client *elastic.Client = nil
if !args.DisableEs { if !args.DisableEs {
esUrl := args.EsHost + ":" + args.EsPort esUrl := args.EsHost + ":" + fmt.Sprintf("%d", args.EsPort)
opts := []elastic.ClientOptionFunc{ opts := []elastic.ClientOptionFunc{
elastic.SetSniff(true), elastic.SetSniff(true),
elastic.SetSnifferTimeoutStartup(time.Second * 60), elastic.SetSnifferTimeoutStartup(time.Second * 60),
@ -376,7 +376,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
myDB.RunGetBlocksAndFilters() myDB.RunGetBlocksAndFilters()
} }
if !args.DisableStartPrometheus { if !args.DisableStartPrometheus {
go s.prometheusEndpoint(s.Args.PrometheusPort, "metrics") go s.prometheusEndpoint(fmt.Sprintf("%d", s.Args.PrometheusPort), "metrics")
} }
if !args.DisableStartUDP { if !args.DisableStartUDP {
go func() { go func() {