WIP: Json rpc federation, search/getclaimbyid, and shutdown #76

Merged
jeffreypicard merged 11 commits from json-rpc-federation-and-shutdown into master 2022-12-07 17:01:36 +01:00
7 changed files with 31 additions and 28 deletions
Showing only changes of commit 95a0b863a2 - Show all commits

View file

@ -4,7 +4,6 @@ import (
"bufio" "bufio"
"context" "context"
"fmt" "fmt"
"log"
"net" "net"
"os" "os"
"strconv" "strconv"
@ -16,13 +15,20 @@ import (
"github.com/lbryio/herald.go/server" "github.com/lbryio/herald.go/server"
"github.com/lbryio/lbry.go/v3/extras/stop" "github.com/lbryio/lbry.go/v3/extras/stop"
dto "github.com/prometheus/client_model/go" dto "github.com/prometheus/client_model/go"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
) )
// lineCountFile takes a fileName and counts the number of lines in it. // lineCountFile takes a fileName and counts the number of lines in it.
func lineCountFile(fileName string) int { func lineCountFile(fileName string) int {
f, err := os.Open(fileName) f, err := os.Open(fileName)
defer f.Close() defer func() {
err := f.Close()
if err != nil {
log.Warn(err)
}
}()
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return 0 return 0
@ -207,7 +213,7 @@ func TestAddPeerEndpoint(t *testing.T) {
go hubServer2.Run() go hubServer2.Run()
//go hubServer.Run() //go hubServer.Run()
conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port), conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
grpc.WithInsecure(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(), grpc.WithBlock(),
) )
if err != nil { if err != nil {
@ -282,7 +288,7 @@ func TestAddPeerEndpoint2(t *testing.T) {
go hubServer2.Run() go hubServer2.Run()
go hubServer3.Run() go hubServer3.Run()
conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port), conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
grpc.WithInsecure(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(), grpc.WithBlock(),
) )
if err != nil { if err != nil {
@ -372,14 +378,14 @@ func TestAddPeerEndpoint3(t *testing.T) {
go hubServer2.Run() go hubServer2.Run()
go hubServer3.Run() go hubServer3.Run()
conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port), conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
grpc.WithInsecure(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(), grpc.WithBlock(),
) )
if err != nil { if err != nil {
log.Fatalf("did not connect: %v", err) log.Fatalf("did not connect: %v", err)
} }
conn2, err := grpc.Dial("localhost:50052", conn2, err := grpc.Dial("localhost:50052",
grpc.WithInsecure(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(), grpc.WithBlock(),
) )
if err != nil { if err != nil {

View file

@ -185,6 +185,9 @@ func TestHeaders(t *testing.T) {
} }
var resp *BlockHeadersResp var resp *BlockHeadersResp
err := s.Headers(&req, &resp) err := s.Headers(&req, &resp)
if err != nil {
t.Errorf("Headers: %v", err)
}
marshalled, err := json.MarshalIndent(resp, "", " ") marshalled, err := json.MarshalIndent(resp, "", " ")
if err != nil { if err != nil {
t.Errorf("height: %v unmarshal err: %v", height, err) t.Errorf("height: %v unmarshal err: %v", height, err)

View file

@ -8,7 +8,6 @@ import (
"github.com/lbryio/herald.go/internal/metrics" "github.com/lbryio/herald.go/internal/metrics"
pb "github.com/lbryio/herald.go/protobuf/go" pb "github.com/lbryio/herald.go/protobuf/go"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -102,7 +101,7 @@ func (t *ClaimtrieService) GetClaimByID(args *GetClaimByIDData, result **pb.Outp
BlockedTotal: 0, //TODO BlockedTotal: 0, //TODO
} }
logrus.Warn(res) log.Warn(res)
*result = res *result = res
return nil return nil

View file

@ -54,9 +54,9 @@ func (s *Server) DoNotify(heightHash *internal.HeightHash) error {
// RunNotifier Runs the notfying action forever // RunNotifier Runs the notfying action forever
func (s *Server) RunNotifier() error { func (s *Server) RunNotifier() error {
for notification := range s.NotifierChan { for notification := range s.NotifierChan {
switch notification.(type) { switch note := notification.(type) {
case internal.HeightHash: case internal.HeightHash:
heightHash, _ := notification.(internal.HeightHash) heightHash := note
s.DoNotify(&heightHash) s.DoNotify(&heightHash)
// Do we need this? // Do we need this?
// case peerNotification: // case peerNotification:

View file

@ -68,15 +68,12 @@ func tcpRead(conn net.Conn) ([]byte, error) {
func TestNotifierServer(t *testing.T) { func TestNotifierServer(t *testing.T) {
args := server.MakeDefaultTestArgs() args := server.MakeDefaultTestArgs()
// ctx := context.Background()
ctx := stop.NewDebug() ctx := stop.NewDebug()
hub := server.MakeHubServer(ctx, args) hub := server.MakeHubServer(ctx, args)
go hub.NotifierServer() go hub.NotifierServer()
go hub.RunNotifier() go hub.RunNotifier()
// time.Sleep(time.Second * 2)
addr := fmt.Sprintf(":%d", args.NotifierPort) addr := fmt.Sprintf(":%d", args.NotifierPort)
logrus.Info(addr) logrus.Info(addr)
conn, err := tcpConnReady(addr) conn, err := tcpConnReady(addr)
@ -99,7 +96,6 @@ func TestNotifierServer(t *testing.T) {
// Hacky but needed because if the reader isn't ready // Hacky but needed because if the reader isn't ready
// before the writer sends it won't get the data // before the writer sends it won't get the data
// time.Sleep(time.Second * 10)
err = subReady(hub) err = subReady(hub)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View file

@ -7,7 +7,9 @@ import (
"errors" "errors"
"fmt" "fmt"
"hash" "hash"
"io/ioutil"
//"io/ioutil"
"log" "log"
"net" "net"
"net/http" "net/http"
@ -176,7 +178,7 @@ func (s *Server) Stop() {
} }
func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, error) { func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, error) {
tmpName, err := ioutil.TempDir("", "go-lbry-hub") tmpName, err := os.MkdirTemp("", "go-lbry-hub")
if err != nil { if err != nil {
logrus.Info(err) logrus.Info(err)
log.Fatal(err) log.Fatal(err)
@ -433,7 +435,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
// for this hub to allow for metric tracking. // for this hub to allow for metric tracking.
func (s *Server) prometheusEndpoint(port string, endpoint string) { func (s *Server) prometheusEndpoint(port string, endpoint string) {
http.Handle("/"+endpoint, promhttp.Handler()) http.Handle("/"+endpoint, promhttp.Handler())
log.Println(fmt.Sprintf("listening on :%s /%s", port, endpoint)) log.Printf("listening on :%s /%s\n", port, endpoint)
err := http.ListenAndServe(":"+port, nil) err := http.ListenAndServe(":"+port, nil)
log.Fatalln("Shouldn't happen??!?!", err) log.Fatalln("Shouldn't happen??!?!", err)
} }

View file

@ -62,12 +62,11 @@ type session struct {
func (s *session) doNotify(notification interface{}) { func (s *session) doNotify(notification interface{}) {
var method string var method string
var params interface{} var params interface{}
switch notification.(type) { switch note := notification.(type) {
case headerNotification: case headerNotification:
moodyjon commented 2022-12-07 15:09:54 +01:00 (Migrated from github.com)
Review

Nice!

Nice!
if !s.headersSub { if !s.headersSub {
return return
} }
note, _ := notification.(headerNotification)
heightHash := note.HeightHash heightHash := note.HeightHash
method = "blockchain.headers.subscribe" method = "blockchain.headers.subscribe"
if s.headersSubRaw { if s.headersSubRaw {
@ -87,7 +86,6 @@ func (s *session) doNotify(notification interface{}) {
params = header params = header
} }
case hashXNotification: case hashXNotification:
note, _ := notification.(hashXNotification)
orig, ok := s.hashXSubs[note.hashX] orig, ok := s.hashXSubs[note.hashX]
if !ok { if !ok {
return return
@ -106,7 +104,6 @@ func (s *session) doNotify(notification interface{}) {
if !s.peersSub { if !s.peersSub {
return return
} }
note, _ := notification.(peerNotification)
method = "server.peers.subscribe" method = "server.peers.subscribe"
params = []string{note.address, note.port} params = []string{note.address, note.port}
@ -364,16 +361,15 @@ func (sm *sessionManager) hashXSubscribe(sess *session, hashX []byte, original s
} }
func (sm *sessionManager) doNotify(notification interface{}) { func (sm *sessionManager) doNotify(notification interface{}) {
switch notification.(type) { switch note := notification.(type) {
case internal.HeightHash: case internal.HeightHash:
// The HeightHash notification translates to headerNotification. // The HeightHash notification translates to headerNotification.
notification = &headerNotification{HeightHash: notification.(internal.HeightHash)} notification = &headerNotification{HeightHash: note}
} }
sm.sessionsMut.RLock() sm.sessionsMut.RLock()
var subsCopy sessionMap var subsCopy sessionMap
switch notification.(type) { switch note := notification.(type) {
case headerNotification: case headerNotification:
note, _ := notification.(headerNotification)
subsCopy = sm.headerSubs subsCopy = sm.headerSubs
if len(subsCopy) > 0 { if len(subsCopy) > 0 {
hdr := [HEADER_SIZE]byte{} hdr := [HEADER_SIZE]byte{}
@ -382,7 +378,6 @@ func (sm *sessionManager) doNotify(notification interface{}) {
note.blockHeaderStr = hex.EncodeToString(note.BlockHeader[:]) note.blockHeaderStr = hex.EncodeToString(note.BlockHeader[:])
} }
case hashXNotification: case hashXNotification:
note, _ := notification.(hashXNotification)
hashXSubs, ok := sm.hashXSubs[note.hashX] hashXSubs, ok := sm.hashXSubs[note.hashX]
if ok { if ok {
subsCopy = hashXSubs subsCopy = hashXSubs
@ -410,8 +405,10 @@ type sessionServerCodec struct {
// ReadRequestHeader provides ability to rewrite the incoming // ReadRequestHeader provides ability to rewrite the incoming
// request "method" field. For example: // request "method" field. For example:
// blockchain.block.get_header -> blockchain.block.Get_header //
// blockchain.address.listunspent -> blockchain.address.Listunspent // blockchain.block.get_header -> blockchain.block.Get_header
// blockchain.address.listunspent -> blockchain.address.Listunspent
//
// This makes the "method" string compatible with rpc.Server // This makes the "method" string compatible with rpc.Server
// requirements. // requirements.
func (c *sessionServerCodec) ReadRequestHeader(req *rpc.Request) error { func (c *sessionServerCodec) ReadRequestHeader(req *rpc.Request) error {