Attempt LBCD connection with rpc.Client.

This commit is contained in:
Jonathan Moody 2022-12-14 16:20:37 -06:00
parent 2821fc0ee9
commit efac6ffd56
5 changed files with 90 additions and 12 deletions

View file

@ -3,6 +3,7 @@ package server
import (
"fmt"
"log"
"net/url"
"os"
"strconv"
"strings"
@ -25,6 +26,7 @@ type Args struct {
Port int
DBPath string
Chain *string
DaemonURL *url.URL
EsHost string
EsPort int
PrometheusPort int
@ -203,10 +205,19 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
environment := GetEnvironmentStandard()
parser := argparse.NewParser("herald", "herald server and client")
serveCmd := parser.NewCommand("serve", "start the hub server")
serveCmd := parser.NewCommand("serve", "start the herald server")
searchCmd := parser.NewCommand("search", "claim search")
dbCmd := parser.NewCommand("db", "db testing")
defaultDaemonURL := "localhost:9245"
if url, ok := environment["DAEMON_URL"]; ok {
defaultDaemonURL = url
}
validateURL := func(arg []string) error {
_, err := url.Parse(arg[0])
return err
}
validatePort := func(arg []string) error {
_, err := strconv.ParseUint(arg[0], 10, 16)
return err
@ -218,6 +229,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath})
chain := parser.Selector("", "chain", []string{chaincfg.MainNetParams.Name, chaincfg.TestNet3Params.Name, chaincfg.RegressionNetParams.Name, "testnet"},
&argparse.Options{Required: false, Help: "Which chain to use, default is 'mainnet'. Values 'regtest' and 'testnet' are for testing", Default: chaincfg.MainNetParams.Name})
daemonURLStr := parser.String("", "daemon-url", &argparse.Options{Required: true, Help: "URL for rpc to lbrycrd or lbcd, <rpcuser>:<rpcpassword>@<lbcd rpc ip><lbrcd rpc port>.", Validate: validateURL, Default: defaultDaemonURL})
esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: DefaultEsHost})
esPort := parser.Int("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort})
prometheusPort := parser.Int("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort})
@ -277,6 +289,11 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
*jsonRPCPort = DefaultJSONRPCPort
}
daemonURL, err := url.Parse(*daemonURLStr)
if err != nil {
log.Fatalf("URL parse failed: %v", err)
}
banner := loadBanner(bannerFile, HUB_PROTOCOL_VERSION)
args := &Args{
@ -285,6 +302,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
Port: *port,
DBPath: *dbPath,
Chain: chain,
DaemonURL: daemonURL,
EsHost: *esHost,
EsPort: *esPort,
PrometheusPort: *prometheusPort,

View file

@ -207,7 +207,7 @@ func TestHeadersSubscribe(t *testing.T) {
return
}
sm := newSessionManager(nil, db, args, grp, &chaincfg.RegressionNetParams)
sm := newSessionManager(nil, db, args, grp, &chaincfg.RegressionNetParams, nil)
sm.start()
defer sm.stop()
@ -388,7 +388,7 @@ func TestAddressSubscribe(t *testing.T) {
return
}
sm := newSessionManager(nil, db, args, grp, &chaincfg.RegressionNetParams)
sm := newSessionManager(nil, db, args, grp, &chaincfg.RegressionNetParams, nil)
sm.start()
defer sm.stop()

View file

@ -1,15 +1,19 @@
package server
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"hash"
"io"
golog "log"
"net"
"net/http"
"net/rpc"
"net/rpc/jsonrpc"
"os"
"regexp"
"strconv"
@ -38,6 +42,7 @@ type Server struct {
WeirdCharsRe *regexp.Regexp
DB *db.ReadOnlyDBColumnFamily
Chain *chaincfg.Params
DaemonClient *rpc.Client
EsClient *elastic.Client
QueryCache *ttlcache.Cache
S256 *hash.Hash
@ -237,6 +242,38 @@ func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, erro
return myDB, nil
}
type BasicAuthClientCodec struct {
client http.Client
url string
user string
password string
buff *bytes.Buffer
}
func (c BasicAuthClientCodec) Read(p []byte) (n int, err error) {
return c.buff.Read(p)
}
func (c BasicAuthClientCodec) Write(p []byte) (n int, err error) {
req, err := http.NewRequest("POST", c.url, bytes.NewReader(p))
if err != nil {
return 0, err
}
req.SetBasicAuth(c.user, c.password)
req.Header.Add("Content-Type", "application/json")
resp, err := c.client.Do(req)
if err != nil {
return len(p), err
}
io.Copy(c.buff, resp.Body)
return len(p), err
}
func (c BasicAuthClientCodec) Close() error {
c.client.CloseIdleConnections()
return nil
}
// MakeHubServer takes the arguments given to a hub when it's started and
// initializes everything. It loads information about previously known peers,
// creates needed internal data structures, and initializes goroutines.
@ -253,7 +290,24 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
log.Fatal(err)
}
var client *elastic.Client = nil
var lbcdClient *rpc.Client = nil
if args.DaemonURL != nil {
log.Warnf("connecting to lbcd daemon at %v...", args.DaemonURL)
password, _ := args.DaemonURL.User.Password()
codec := jsonrpc.NewClientCodec(
&BasicAuthClientCodec{
client: http.Client{
Timeout: 30 * time.Second,
},
url: args.DaemonURL.Host,
user: args.DaemonURL.User.Username(),
password: password,
buff: bytes.NewBuffer(nil),
})
lbcdClient = rpc.NewClientWithCodec(codec)
}
var esClient *elastic.Client = nil
if !args.DisableEs {
esUrl := args.EsHost + ":" + fmt.Sprintf("%d", args.EsPort)
opts := []elastic.ClientOptionFunc{
@ -265,7 +319,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
if args.Debug {
opts = append(opts, elastic.SetTraceLog(golog.New(os.Stderr, "[[ELASTIC]]", 0)))
}
client, err = elastic.NewClient(opts...)
esClient, err = elastic.NewClient(opts...)
if err != nil {
log.Fatal(err)
}
@ -344,7 +398,8 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
WeirdCharsRe: weirdCharsRe,
DB: myDB,
Chain: &chain,
EsClient: client,
DaemonClient: lbcdClient,
EsClient: esClient,
QueryCache: cache,
S256: &s256,
LastRefreshCheck: time.Now(),
@ -364,7 +419,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
sessionManager: nil,
}
// FIXME: HACK
s.sessionManager = newSessionManager(s, myDB, args, sessionGrp, &chain)
s.sessionManager = newSessionManager(s, myDB, args, sessionGrp, &chain, lbcdClient)
// Start up our background services
if !args.DisableResolve && !args.DisableRocksDBRefresh {

View file

@ -15,5 +15,5 @@ func (s *Server) GetNumPeersExported() func() int64 {
}
func NewSessionManagerExported(server *Server, db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
return newSessionManager(server, db, args, grp, chain)
return newSessionManager(server, db, args, grp, chain, nil)
}

View file

@ -140,6 +140,7 @@ type sessionManager struct {
args *Args
server *Server
chain *chaincfg.Params
lbcd *rpc.Client
// peerSubs are sessions subscribed via 'blockchain.peers.subscribe'
peerSubs sessionMap
// headerSubs are sessions subscribed via 'blockchain.headers.subscribe'
@ -148,7 +149,7 @@ type sessionManager struct {
hashXSubs map[[HASHX_LEN]byte]sessionMap
}
func newSessionManager(server *Server, db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
func newSessionManager(server *Server, db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params, lbcd *rpc.Client) *sessionManager {
return &sessionManager{
sessions: make(sessionMap),
grp: grp,
@ -159,6 +160,7 @@ func newSessionManager(server *Server, db *db.ReadOnlyDBColumnFamily, args *Args
args: args,
server: server,
chain: chain,
lbcd: lbcd,
peerSubs: make(sessionMap),
headerSubs: make(sessionMap),
hashXSubs: make(map[[HASHX_LEN]byte]sessionMap),
@ -306,9 +308,12 @@ func (sm *sessionManager) removeSessionLocked(sess *session) {
}
func (sm *sessionManager) broadcastTx(rawTx []byte) (*chainhash.Hash, error) {
// TODO
panic("not implemented")
return nil, nil
var reply string
err := sm.lbcd.Call("sendrawtransaction", hex.EncodeToString(rawTx), reply)
if err != nil {
return nil, err
}
return chainhash.NewHashFromStr(reply)
}
func (sm *sessionManager) peersSubscribe(sess *session, subscribe bool) {