WIP: blockchain.transaction.yyy JSON RPC implementations #78

Merged
moodyjon merged 12 commits from blockchain_tx_rpc1 into master 2022-12-06 22:14:28 +01:00
6 changed files with 44 additions and 29 deletions
Showing only changes of commit d5d8e0db7f - Show all commits

View file

@ -3,6 +3,7 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"strconv"
"time" "time"
_ "net/http/pprof" _ "net/http/pprof"
@ -63,7 +64,7 @@ func main() {
return return
} }
conn, err := grpc.Dial("localhost:"+args.Port, conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
grpc.WithInsecure(), grpc.WithInsecure(),
grpc.WithBlock(), grpc.WithBlock(),
) )

View file

@ -22,7 +22,7 @@ const (
type Args struct { type Args struct {
CmdType int CmdType int
Host string Host string
Port string Port int
DBPath string DBPath string
Chain *string Chain *string
EsHost string EsHost string
@ -67,7 +67,7 @@ type Args struct {
const ( const (
DefaultHost = "0.0.0.0" DefaultHost = "0.0.0.0"
DefaultPort = "50051" DefaultPort = 50051
DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME
DefaultEsHost = "http://localhost" DefaultEsHost = "http://localhost"
DefaultEsIndex = "claims" DefaultEsIndex = "claims"
@ -214,7 +214,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
// main server config arguments // main server config arguments
host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost}) host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost})
port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort}) port := parser.Int("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Validate: validatePort, Default: DefaultPort})
dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath}) dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath})
chain := parser.Selector("", "chain", []string{chaincfg.MainNetParams.Name, chaincfg.TestNet3Params.Name, chaincfg.RegressionNetParams.Name, "testnet"}, chain := parser.Selector("", "chain", []string{chaincfg.MainNetParams.Name, chaincfg.TestNet3Params.Name, chaincfg.RegressionNetParams.Name, "testnet"},
&argparse.Options{Required: false, Help: "Which chain to use, default is 'mainnet'. Values 'regtest' and 'testnet' are for testing", Default: chaincfg.MainNetParams.Name}) &argparse.Options{Required: false, Help: "Which chain to use, default is 'mainnet'. Values 'regtest' and 'testnet' are for testing", Default: chaincfg.MainNetParams.Name})

View file

@ -7,6 +7,7 @@ import (
"math" "math"
"net" "net"
"os" "os"
"strconv"
"strings" "strings"
"sync/atomic" "sync/atomic"
"time" "time"
@ -86,7 +87,7 @@ func (s *Server) getAndSetExternalIp(ip, port string) error {
// storing them as known peers. Returns a map of peerKey -> object // storing them as known peers. Returns a map of peerKey -> object
func (s *Server) loadPeers() error { func (s *Server) loadPeers() error {
peerFile := s.Args.PeerFile peerFile := s.Args.PeerFile
port := s.Args.Port port := strconv.Itoa(s.Args.Port)
// First we make sure our server has come up, so we can answer back to peers. // First we make sure our server has come up, so we can answer back to peers.
var failures = 0 var failures = 0
@ -181,12 +182,12 @@ func (s *Server) subscribeToPeer(peer *Peer) error {
msg := &pb.ServerMessage{ msg := &pb.ServerMessage{
Address: s.ExternalIP.String(), Address: s.ExternalIP.String(),
Port: s.Args.Port, Port: strconv.Itoa(s.Args.Port),
} }
c := pb.NewHubClient(conn) c := pb.NewHubClient(conn)
log.Printf("%s:%s subscribing to %+v\n", s.ExternalIP, s.Args.Port, peer) log.Printf("%s:%d subscribing to %+v\n", s.ExternalIP, s.Args.Port, peer)
_, err = c.PeerSubscribe(ctx, msg) _, err = c.PeerSubscribe(ctx, msg)
if err != nil { if err != nil {
return err return err
@ -219,12 +220,12 @@ func (s *Server) helloPeer(peer *Peer) (*pb.HelloMessage, error) {
c := pb.NewHubClient(conn) c := pb.NewHubClient(conn)
msg := &pb.HelloMessage{ msg := &pb.HelloMessage{
Port: s.Args.Port, Port: strconv.Itoa(s.Args.Port),
Host: s.ExternalIP.String(), Host: s.ExternalIP.String(),
Servers: []*pb.ServerMessage{}, Servers: []*pb.ServerMessage{},
} }
log.Printf("%s:%s saying hello to %+v\n", s.ExternalIP, s.Args.Port, peer) log.Printf("%s:%d saying hello to %+v\n", s.ExternalIP, s.Args.Port, peer)
res, err := c.Hello(ctx, msg) res, err := c.Hello(ctx, msg)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
@ -345,15 +346,15 @@ func (s *Server) addPeer(newPeer *Peer, ping bool, subscribe bool) error {
} }
} }
if s.Args.Port == newPeer.Port && if strconv.Itoa(s.Args.Port) == newPeer.Port &&
(localHosts[newPeer.Address] || newPeer.Address == s.ExternalIP.String()) { (localHosts[newPeer.Address] || newPeer.Address == s.ExternalIP.String()) {
log.Printf("%s:%s addPeer: Self peer, skipping...\n", s.ExternalIP, s.Args.Port) log.Printf("%s:%d addPeer: Self peer, skipping...\n", s.ExternalIP, s.Args.Port)
return nil return nil
} }
k := peerKey(newPeer) k := peerKey(newPeer)
log.Printf("%s:%s adding peer %+v\n", s.ExternalIP, s.Args.Port, newPeer) log.Printf("%s:%d adding peer %+v\n", s.ExternalIP, s.Args.Port, newPeer)
if oldServer, loaded := s.PeerServersLoadOrStore(newPeer); !loaded { if oldServer, loaded := s.PeerServersLoadOrStore(newPeer); !loaded {
if ping { if ping {
_, err := s.helloPeer(newPeer) _, err := s.helloPeer(newPeer)
@ -415,7 +416,7 @@ func (s *Server) makeHelloMessage() *pb.HelloMessage {
s.PeerServersMut.RUnlock() s.PeerServersMut.RUnlock()
return &pb.HelloMessage{ return &pb.HelloMessage{
Port: s.Args.Port, Port: strconv.Itoa(s.Args.Port),
Host: s.ExternalIP.String(), Host: s.ExternalIP.String(),
Servers: servers, Servers: servers,
} }

View file

@ -7,6 +7,7 @@ import (
"log" "log"
"net" "net"
"os" "os"
"strconv"
"strings" "strings"
"testing" "testing"
@ -167,7 +168,7 @@ func TestAddPeerEndpoint(t *testing.T) {
ctx := stop.NewDebug() ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs() args := server.MakeDefaultTestArgs()
args2 := server.MakeDefaultTestArgs() args2 := server.MakeDefaultTestArgs()
args2.Port = "50052" args2.Port = 50052
tests := []struct { tests := []struct {
name string name string
@ -198,7 +199,7 @@ func TestAddPeerEndpoint(t *testing.T) {
go hubServer.Run() go hubServer.Run()
go hubServer2.Run() go hubServer2.Run()
//go hubServer.Run() //go hubServer.Run()
conn, err := grpc.Dial("localhost:"+args.Port, conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
grpc.WithInsecure(), grpc.WithInsecure(),
grpc.WithBlock(), grpc.WithBlock(),
) )
@ -240,8 +241,8 @@ func TestAddPeerEndpoint2(t *testing.T) {
args := server.MakeDefaultTestArgs() args := server.MakeDefaultTestArgs()
args2 := server.MakeDefaultTestArgs() args2 := server.MakeDefaultTestArgs()
args3 := server.MakeDefaultTestArgs() args3 := server.MakeDefaultTestArgs()
args2.Port = "50052" args2.Port = 50052
args3.Port = "50053" args3.Port = 50053
tests := []struct { tests := []struct {
name string name string
@ -266,7 +267,7 @@ func TestAddPeerEndpoint2(t *testing.T) {
go hubServer.Run() go hubServer.Run()
go hubServer2.Run() go hubServer2.Run()
go hubServer3.Run() go hubServer3.Run()
conn, err := grpc.Dial("localhost:"+args.Port, conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
grpc.WithInsecure(), grpc.WithInsecure(),
grpc.WithBlock(), grpc.WithBlock(),
) )
@ -322,8 +323,8 @@ func TestAddPeerEndpoint3(t *testing.T) {
args := server.MakeDefaultTestArgs() args := server.MakeDefaultTestArgs()
args2 := server.MakeDefaultTestArgs() args2 := server.MakeDefaultTestArgs()
args3 := server.MakeDefaultTestArgs() args3 := server.MakeDefaultTestArgs()
args2.Port = "50052" args2.Port = 50052
args3.Port = "50053" args3.Port = 50053
tests := []struct { tests := []struct {
name string name string
@ -348,7 +349,7 @@ func TestAddPeerEndpoint3(t *testing.T) {
go hubServer.Run() go hubServer.Run()
go hubServer2.Run() go hubServer2.Run()
go hubServer3.Run() go hubServer3.Run()
conn, err := grpc.Dial("localhost:"+args.Port, conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
grpc.WithInsecure(), grpc.WithInsecure(),
grpc.WithBlock(), grpc.WithBlock(),
) )
@ -412,7 +413,7 @@ func TestUDPServer(t *testing.T) {
args := server.MakeDefaultTestArgs() args := server.MakeDefaultTestArgs()
args.DisableStartUDP = false args.DisableStartUDP = false
args2 := server.MakeDefaultTestArgs() args2 := server.MakeDefaultTestArgs()
args2.Port = "50052" args2.Port = 50052
args2.DisableStartUDP = false args2.DisableStartUDP = false
tests := []struct { tests := []struct {
@ -449,12 +450,12 @@ func TestUDPServer(t *testing.T) {
got1 := hubServer.ExternalIP.String() got1 := hubServer.ExternalIP.String()
if got1 != tt.want { if got1 != tt.want {
t.Errorf("hubServer.ExternalIP = %s, want %s\n", got1, tt.want) t.Errorf("hubServer.ExternalIP = %s, want %s\n", got1, tt.want)
t.Errorf("hubServer.Args.Port = %s\n", hubServer.Args.Port) t.Errorf("hubServer.Args.Port = %d\n", hubServer.Args.Port)
} }
got2 := hubServer2.ExternalIP.String() got2 := hubServer2.ExternalIP.String()
if got2 != tt.want { if got2 != tt.want {
t.Errorf("hubServer2.ExternalIP = %s, want %s\n", got2, tt.want) t.Errorf("hubServer2.ExternalIP = %s, want %s\n", got2, tt.want)
t.Errorf("hubServer2.Args.Port = %s\n", hubServer2.Args.Port) t.Errorf("hubServer2.Args.Port = %d\n", hubServer2.Args.Port)
} }
}) })
} }

View file

@ -13,6 +13,7 @@ import (
"net/http" "net/http"
"os" "os"
"regexp" "regexp"
"strconv"
"sync" "sync"
"time" "time"
@ -136,7 +137,8 @@ func (s *Server) PeerServersLoadOrStore(peer *Peer) (actual *Peer, loaded bool)
// Run "main" function for starting the server. This blocks. // Run "main" function for starting the server. This blocks.
func (s *Server) Run() { func (s *Server) Run() {
l, err := net.Listen("tcp", ":"+s.Args.Port) address := ":" + strconv.Itoa(s.Args.Port)
l, err := net.Listen("tcp", address)
if err != nil { if err != nil {
log.Fatalf("failed to listen: %v", err) log.Fatalf("failed to listen: %v", err)
} }
@ -171,6 +173,7 @@ func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, erro
if myDB.LastState != nil { if myDB.LastState != nil {
logrus.Infof("DB version: %v", myDB.LastState.DBVersion) logrus.Infof("DB version: %v", myDB.LastState.DBVersion)
logrus.Infof("height: %v", myDB.LastState.Height) logrus.Infof("height: %v", myDB.LastState.Height)
logrus.Infof("genesis: %v", myDB.LastState.Genesis.String())
logrus.Infof("tip: %v", myDB.LastState.Tip.String()) logrus.Infof("tip: %v", myDB.LastState.Tip.String())
logrus.Infof("tx count: %v", myDB.LastState.TxCount) logrus.Infof("tx count: %v", myDB.LastState.TxCount)
} }
@ -353,11 +356,19 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
} }
if !args.DisableStartUDP { if !args.DisableStartUDP {
go func() { go func() {
err := s.UDPServer() err := s.UDPServer(s.Args.Port)
if err != nil { if err != nil {
log.Println("UDP Server failed!", err) logrus.Errorf("UDP Server (%d) failed! %v", s.Args.Port, err)
} }
}() }()
if s.Args.JSONRPCPort != 0 {
go func() {
err := s.UDPServer(s.Args.JSONRPCPort)
if err != nil {
logrus.Errorf("UDP Server (%d) failed! %v", s.Args.JSONRPCPort, err)
}
}()
}
} }
if !args.DisableStartNotifier { if !args.DisableStartNotifier {
go func() { go func() {

View file

@ -219,8 +219,9 @@ func UDPPing(ip, port string) (*SPVPong, error) {
// UDPServer is a goroutine that starts an udp server that implements the hubs // UDPServer is a goroutine that starts an udp server that implements the hubs
// Ping/Pong protocol to find out about each other without making full TCP // Ping/Pong protocol to find out about each other without making full TCP
// connections. // connections.
func (s *Server) UDPServer() error { func (s *Server) UDPServer(port int) error {
address := ":" + s.Args.Port address := ":" + strconv.Itoa(port)
tip := make([]byte, 32) tip := make([]byte, 32)
addr, err := net.ResolveUDPAddr("udp", address) addr, err := net.ResolveUDPAddr("udp", address)
if err != nil { if err != nil {