Support both pure JSON and JSON-over-HTTP services.

Forward NotifierChan messages to sessionManager.
This commit is contained in:
Jonathan Moody 2022-09-27 18:46:11 -05:00
parent 7f47de2949
commit 1d227dbca8
7 changed files with 92 additions and 39 deletions

View file

@ -650,7 +650,7 @@ func (db *ReadOnlyDBColumnFamily) Shutdown() {
// RunDetectChanges Go routine the runs continuously while the hub is active // RunDetectChanges Go routine the runs continuously while the hub is active
// to keep the db readonly view up to date and handle reorgs on the // to keep the db readonly view up to date and handle reorgs on the
// blockchain. // blockchain.
func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan *internal.HeightHash) { func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) {
go func() { go func() {
lastPrint := time.Now() lastPrint := time.Now()
for { for {
@ -674,7 +674,7 @@ func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan *internal.Height
} }
// DetectChanges keep the rocksdb db in sync and handle reorgs // DetectChanges keep the rocksdb db in sync and handle reorgs
func (db *ReadOnlyDBColumnFamily) detectChanges(notifCh chan *internal.HeightHash) error { func (db *ReadOnlyDBColumnFamily) detectChanges(notifCh chan<- interface{}) error {
err := db.DB.TryCatchUpWithPrimary() err := db.DB.TryCatchUpWithPrimary()
if err != nil { if err != nil {
return err return err

View file

@ -3,6 +3,7 @@ package server
import ( import (
"log" "log"
"os" "os"
"strconv"
"strings" "strings"
"github.com/akamensky/argparse" "github.com/akamensky/argparse"
@ -27,7 +28,8 @@ type Args struct {
EsPort string EsPort string
PrometheusPort string PrometheusPort string
NotifierPort string NotifierPort string
JSONRPCPort string JSONRPCPort *int
JSONRPCHTTPPort *int
EsIndex string EsIndex string
RefreshDelta int RefreshDelta int
CacheTTL int CacheTTL int
@ -58,7 +60,7 @@ const (
DefaultEsPort = "9200" DefaultEsPort = "9200"
DefaultPrometheusPort = "2112" DefaultPrometheusPort = "2112"
DefaultNotifierPort = "18080" DefaultNotifierPort = "18080"
DefaultJSONRPCPort = "50001" DefaultJSONRPCPort = 50001
DefaultRefreshDelta = 5 DefaultRefreshDelta = 5
DefaultCacheTTL = 5 DefaultCacheTTL = 5
DefaultPeerFile = "peers.txt" DefaultPeerFile = "peers.txt"
@ -111,6 +113,11 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
searchCmd := parser.NewCommand("search", "claim search") searchCmd := parser.NewCommand("search", "claim search")
dbCmd := parser.NewCommand("db", "db testing") dbCmd := parser.NewCommand("db", "db testing")
validatePort := func(arg []string) error {
_, err := strconv.ParseUint(arg[0], 10, 16)
return err
}
host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost}) host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost})
port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort}) port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort})
dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath}) dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath})
@ -120,7 +127,8 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort}) esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort})
prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort}) prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort})
notifierPort := parser.String("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort}) notifierPort := parser.String("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort})
jsonRPCPort := parser.String("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Default: DefaultJSONRPCPort}) jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort, Default: DefaultJSONRPCPort})
jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort})
esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: DefaultEsIndex}) esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: DefaultEsIndex})
refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: DefaultRefreshDelta}) refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: DefaultRefreshDelta})
cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: DefaultCacheTTL}) cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: DefaultCacheTTL})
@ -168,7 +176,8 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
EsPort: *esPort, EsPort: *esPort,
PrometheusPort: *prometheusPort, PrometheusPort: *prometheusPort,
NotifierPort: *notifierPort, NotifierPort: *notifierPort,
JSONRPCPort: *jsonRPCPort, JSONRPCPort: jsonRPCPort,
JSONRPCHTTPPort: jsonRPCHTTPPort,
EsIndex: *esIndex, EsIndex: *esIndex,
RefreshDelta: *refreshDelta, RefreshDelta: *refreshDelta,
CacheTTL: *cacheTTL, CacheTTL: *cacheTTL,

View file

@ -46,6 +46,7 @@ func removeFile(fileName string) {
// makeDefaultArgs creates a default set of arguments for testing the server. // makeDefaultArgs creates a default set of arguments for testing the server.
func makeDefaultArgs() *server.Args { func makeDefaultArgs() *server.Args {
port := server.DefaultJSONRPCPort
args := &server.Args{ args := &server.Args{
CmdType: server.ServeCmd, CmdType: server.ServeCmd,
Host: server.DefaultHost, Host: server.DefaultHost,
@ -55,7 +56,7 @@ func makeDefaultArgs() *server.Args {
EsPort: server.DefaultEsPort, EsPort: server.DefaultEsPort,
PrometheusPort: server.DefaultPrometheusPort, PrometheusPort: server.DefaultPrometheusPort,
NotifierPort: server.DefaultNotifierPort, NotifierPort: server.DefaultNotifierPort,
JSONRPCPort: server.DefaultJSONRPCPort, JSONRPCPort: &port,
EsIndex: server.DefaultEsIndex, EsIndex: server.DefaultEsIndex,
RefreshDelta: server.DefaultRefreshDelta, RefreshDelta: server.DefaultRefreshDelta,
CacheTTL: server.DefaultCacheTTL, CacheTTL: server.DefaultCacheTTL,

View file

@ -2,7 +2,9 @@ package server
import ( import (
"fmt" "fmt"
"net"
"net/http" "net/http"
"strconv"
"strings" "strings"
gorilla_mux "github.com/gorilla/mux" gorilla_mux "github.com/gorilla/mux"
@ -49,36 +51,71 @@ func (cr *gorillaRpcCodecRequest) Method() (string, error) {
// StartJsonRPC starts the json rpc server and registers the endpoints. // StartJsonRPC starts the json rpc server and registers the endpoints.
func (s *Server) StartJsonRPC() error { func (s *Server) StartJsonRPC() error {
port := ":" + s.Args.JSONRPCPort s.sessionManager.start()
defer s.sessionManager.stop()
s1 := gorilla_rpc.NewServer() // Create a new RPC server // Set up the pure JSONRPC server with persistent connections/sessions.
// Register the type of data requested as JSON, with custom codec. for s.Args.JSONRPCPort != nil {
s1.RegisterCodec(&gorillaRpcCodec{gorilla_json.NewCodec()}, "application/json") port := ":" + strconv.FormatUint(uint64(*s.Args.JSONRPCPort), 10)
laddr, err := net.ResolveTCPAddr("tcp", port)
// Register "blockchain.claimtrie.*"" handlers. if err != nil {
claimtrieSvc := &ClaimtrieService{s.DB} log.Errorf("ResoveIPAddr: %v\n", err)
err := s1.RegisterTCPService(claimtrieSvc, "blockchain_claimtrie") break
if err != nil { }
log.Errorf("RegisterService: %v\n", err) listener, err := net.ListenTCP("tcp", laddr)
if err != nil {
log.Errorf("ListenTCP: %v\n", err)
break
}
acceptConnections := func(listener net.Listener) {
for {
conn, err := listener.Accept()
if err != nil {
log.Errorf("Accept: %v\n", err)
break
}
log.Infof("Accepted: %v", conn.RemoteAddr())
s.sessionManager.addSession(conn)
}
}
go acceptConnections(listener)
break
} }
// Register other "blockchain.{block,address,scripthash}.*" handlers. // Set up the JSONRPC over HTTP server.
err = s1.RegisterTCPService(&BlockchainBlockService{s.DB, s.Chain}, "blockchain_block") for s.Args.JSONRPCHTTPPort != nil {
if err != nil { s1 := gorilla_rpc.NewServer() // Create a new RPC server
log.Errorf("RegisterService: %v\n", err) // Register the type of data requested as JSON, with custom codec.
} s1.RegisterCodec(&gorillaRpcCodec{gorilla_json.NewCodec()}, "application/json")
err = s1.RegisterTCPService(&BlockchainAddressService{s.DB, s.Chain, nil, nil}, "blockchain_address")
if err != nil {
log.Errorf("RegisterService: %v\n", err)
}
err = s1.RegisterTCPService(&BlockchainScripthashService{s.DB, s.Chain, nil, nil}, "blockchain_scripthash")
if err != nil {
log.Errorf("RegisterService: %v\n", err)
}
r := gorilla_mux.NewRouter() // Register "blockchain.claimtrie.*"" handlers.
r.Handle("/rpc", s1) claimtrieSvc := &ClaimtrieService{s.DB}
log.Fatal(http.ListenAndServe(port, r)) err := s1.RegisterTCPService(claimtrieSvc, "blockchain_claimtrie")
if err != nil {
log.Errorf("RegisterService: %v\n", err)
}
// Register other "blockchain.{block,address,scripthash}.*" handlers.
blockchainSvc := &BlockchainBlockService{s.DB, s.Chain}
err = s1.RegisterTCPService(blockchainSvc, "blockchain_block")
if err != nil {
log.Errorf("RegisterService: %v\n", err)
}
err = s1.RegisterTCPService(&BlockchainAddressService{s.DB, s.Chain, nil, nil}, "blockchain_address")
if err != nil {
log.Errorf("RegisterService: %v\n", err)
}
err = s1.RegisterTCPService(&BlockchainScripthashService{s.DB, s.Chain, nil, nil}, "blockchain_scripthash")
if err != nil {
log.Errorf("RegisterService: %v\n", err)
}
r := gorilla_mux.NewRouter()
r.Handle("/rpc", s1)
port := ":" + strconv.FormatUint(uint64(*s.Args.JSONRPCHTTPPort), 10)
log.Fatal(http.ListenAndServe(port, r))
break
}
return nil return nil
} }

View file

@ -52,8 +52,13 @@ func (s *Server) DoNotify(heightHash *internal.HeightHash) error {
// RunNotifier Runs the notfying action forever // RunNotifier Runs the notfying action forever
func (s *Server) RunNotifier() error { func (s *Server) RunNotifier() error {
for heightHash := range s.NotifierChan { for notification := range s.NotifierChan {
s.DoNotify(heightHash) switch notification.(type) {
case internal.HeightHash:
heightHash, _ := notification.(internal.HeightHash)
s.DoNotify(&heightHash)
}
s.sessionManager.doNotify(notification)
} }
return nil return nil
} }

View file

@ -80,7 +80,7 @@ func TestNotifierServer(t *testing.T) {
hash, _ := hex.DecodeString("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") hash, _ := hex.DecodeString("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
logrus.Warn("sending hash") logrus.Warn("sending hash")
hub.NotifierChan <- &internal.HeightHash{Height: 1, BlockHash: hash} hub.NotifierChan <- internal.HeightHash{Height: 1, BlockHash: hash}
res := <-resCh res := <-resCh
logrus.Info(string(res)) logrus.Info(string(res))

View file

@ -18,7 +18,6 @@ import (
"github.com/ReneKroon/ttlcache/v2" "github.com/ReneKroon/ttlcache/v2"
"github.com/lbryio/herald.go/db" "github.com/lbryio/herald.go/db"
"github.com/lbryio/herald.go/internal"
"github.com/lbryio/herald.go/internal/metrics" "github.com/lbryio/herald.go/internal/metrics"
"github.com/lbryio/herald.go/meta" "github.com/lbryio/herald.go/meta"
pb "github.com/lbryio/herald.go/protobuf/go" pb "github.com/lbryio/herald.go/protobuf/go"
@ -53,7 +52,8 @@ type Server struct {
ExternalIP net.IP ExternalIP net.IP
HeightSubs map[net.Addr]net.Conn HeightSubs map[net.Addr]net.Conn
HeightSubsMut sync.RWMutex HeightSubsMut sync.RWMutex
NotifierChan chan *internal.HeightHash NotifierChan chan interface{}
sessionManager *sessionManager
pb.UnimplementedHubServer pb.UnimplementedHubServer
} }
@ -332,7 +332,8 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
ExternalIP: net.IPv4(127, 0, 0, 1), ExternalIP: net.IPv4(127, 0, 0, 1),
HeightSubs: make(map[net.Addr]net.Conn), HeightSubs: make(map[net.Addr]net.Conn),
HeightSubsMut: sync.RWMutex{}, HeightSubsMut: sync.RWMutex{},
NotifierChan: make(chan *internal.HeightHash), NotifierChan: make(chan interface{}),
sessionManager: newSessionManager(myDB, &chain),
} }
// Start up our background services // Start up our background services