WIP: Json rpc federation, search/getclaimbyid, and shutdown #76

Merged
jeffreypicard merged 11 commits from json-rpc-federation-and-shutdown into master 2022-12-07 17:01:36 +01:00
4 changed files with 26 additions and 29 deletions
Showing only changes of commit f774823b2e - Show all commits

View file

@ -4,7 +4,6 @@ import (
"bytes" "bytes"
"encoding/csv" "encoding/csv"
"encoding/hex" "encoding/hex"
"log"
"os" "os"
"strings" "strings"
"testing" "testing"
@ -14,6 +13,7 @@ import (
"github.com/lbryio/herald.go/internal" "github.com/lbryio/herald.go/internal"
"github.com/lbryio/lbry.go/v3/extras/stop" "github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/linxGnu/grocksdb" "github.com/linxGnu/grocksdb"
log "github.com/sirupsen/logrus"
) )
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View file

@ -12,13 +12,13 @@ import (
"encoding/binary" "encoding/binary"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"log"
"reflect" "reflect"
"sort" "sort"
"strings" "strings"
"github.com/lbryio/herald.go/internal" "github.com/lbryio/herald.go/internal"
"github.com/lbryio/lbcd/chaincfg/chainhash" "github.com/lbryio/lbcd/chaincfg/chainhash"
log "github.com/sirupsen/logrus"
) )
const ( const (

View file

@ -6,7 +6,6 @@ import (
"encoding/csv" "encoding/csv"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"log"
"math" "math"
"math/big" "math/big"
"os" "os"
@ -16,6 +15,7 @@ import (
dbpkg "github.com/lbryio/herald.go/db" dbpkg "github.com/lbryio/herald.go/db"
prefixes "github.com/lbryio/herald.go/db/prefixes" prefixes "github.com/lbryio/herald.go/db/prefixes"
"github.com/linxGnu/grocksdb" "github.com/linxGnu/grocksdb"
log "github.com/sirupsen/logrus"
) )
func TestPrefixRegistry(t *testing.T) { func TestPrefixRegistry(t *testing.T) {

View file

@ -7,10 +7,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"hash" "hash"
golog "log"
//"io/ioutil"
"log"
"net" "net"
"net/http" "net/http"
"os" "os"
@ -29,7 +26,7 @@ import (
"github.com/olivere/elastic/v7" "github.com/olivere/elastic/v7"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
logrus "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/reflection" "google.golang.org/grpc/reflection"
) )
@ -180,26 +177,26 @@ func (s *Server) Stop() {
func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, error) { func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, error) {
tmpName, err := os.MkdirTemp("", "go-lbry-hub") tmpName, err := os.MkdirTemp("", "go-lbry-hub")
if err != nil { if err != nil {
logrus.Info(err) log.Info(err)
log.Fatal(err) log.Fatal(err)
} }
logrus.Info("tmpName", tmpName) log.Info("tmpName", tmpName)
if err != nil { if err != nil {
logrus.Info(err) log.Info(err)
} }
myDB, err := db.GetProdDB(args.DBPath, tmpName, grp) myDB, err := db.GetProdDB(args.DBPath, tmpName, grp)
if err != nil { if err != nil {
// Can't load the db, fail loudly // Can't load the db, fail loudly
logrus.Info(err) log.Info(err)
log.Fatalln(err) log.Fatalln(err)
} }
if myDB.LastState != nil { if myDB.LastState != nil {
logrus.Infof("DB version: %v", myDB.LastState.DBVersion) log.Infof("DB version: %v", myDB.LastState.DBVersion)
logrus.Infof("height: %v", myDB.LastState.Height) log.Infof("height: %v", myDB.LastState.Height)
logrus.Infof("genesis: %v", myDB.LastState.Genesis.String()) log.Infof("genesis: %v", myDB.LastState.Genesis.String())
logrus.Infof("tip: %v", myDB.LastState.Tip.String()) log.Infof("tip: %v", myDB.LastState.Tip.String())
logrus.Infof("tx count: %v", myDB.LastState.TxCount) log.Infof("tx count: %v", myDB.LastState.TxCount)
} }
blockingChannelHashes := make([][]byte, 0, 10) blockingChannelHashes := make([][]byte, 0, 10)
@ -210,7 +207,7 @@ func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, erro
for _, id := range args.BlockingChannelIds { for _, id := range args.BlockingChannelIds {
hash, err := hex.DecodeString(id) hash, err := hex.DecodeString(id)
if err != nil { if err != nil {
logrus.Warn("Invalid channel id: ", id) log.Warn("Invalid channel id: ", id)
continue continue
} }
blockingChannelHashes = append(blockingChannelHashes, hash) blockingChannelHashes = append(blockingChannelHashes, hash)
@ -220,7 +217,7 @@ func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, erro
for _, id := range args.FilteringChannelIds { for _, id := range args.FilteringChannelIds {
hash, err := hex.DecodeString(id) hash, err := hex.DecodeString(id)
if err != nil { if err != nil {
logrus.Warn("Invalid channel id: ", id) log.Warn("Invalid channel id: ", id)
continue continue
} }
filteringChannelHashes = append(filteringChannelHashes, hash) filteringChannelHashes = append(filteringChannelHashes, hash)
@ -231,10 +228,10 @@ func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, erro
myDB.FilteringChannelHashes = filteringChannelHashes myDB.FilteringChannelHashes = filteringChannelHashes
if len(filteringIds) > 0 { if len(filteringIds) > 0 {
logrus.Infof("filtering claims reposted by channels: %+s", filteringIds) log.Infof("filtering claims reposted by channels: %+s", filteringIds)
} }
if len(blockingIds) > 0 { if len(blockingIds) > 0 {
logrus.Infof("blocking claims reposted by channels: %+s", blockingIds) log.Infof("blocking claims reposted by channels: %+s", blockingIds)
} }
return myDB, nil return myDB, nil
@ -266,7 +263,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
elastic.SetURL(esUrl), elastic.SetURL(esUrl),
} }
if args.Debug { if args.Debug {
opts = append(opts, elastic.SetTraceLog(log.New(os.Stderr, "[[ELASTIC]]", 0))) opts = append(opts, elastic.SetTraceLog(golog.New(os.Stderr, "[[ELASTIC]]", 0)))
} }
client, err = elastic.NewClient(opts...) client, err = elastic.NewClient(opts...)
if err != nil { if err != nil {
@ -295,7 +292,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
if !args.DisableResolve { if !args.DisableResolve {
myDB, err = LoadDatabase(args, grp) myDB, err = LoadDatabase(args, grp)
if err != nil { if err != nil {
logrus.Warning(err) log.Warning(err)
} }
} }
@ -326,7 +323,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
chain := chaincfg.MainNetParams chain := chaincfg.MainNetParams
if dbChain != nil && cliChain != nil { if dbChain != nil && cliChain != nil {
if dbChain != cliChain { if dbChain != cliChain {
logrus.Warnf("network: %v (from db) conflicts with %v (from cli)", dbChain.Name, cliChain.Name) log.Warnf("network: %v (from db) conflicts with %v (from cli)", dbChain.Name, cliChain.Name)
} }
chain = *dbChain chain = *dbChain
} else if dbChain != nil { } else if dbChain != nil {
@ -334,7 +331,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
} else if cliChain != nil { } else if cliChain != nil {
chain = *cliChain chain = *cliChain
} }
logrus.Infof("network: %v", chain.Name) log.Infof("network: %v", chain.Name)
args.GenesisHash = chain.GenesisHash.String() args.GenesisHash = chain.GenesisHash.String()
@ -371,7 +368,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
// Start up our background services // Start up our background services
if !args.DisableResolve && !args.DisableRocksDBRefresh { if !args.DisableResolve && !args.DisableRocksDBRefresh {
logrus.Info("Running detect changes") log.Info("Running detect changes")
myDB.RunDetectChanges(s.NotifierChan) myDB.RunDetectChanges(s.NotifierChan)
} }
if !args.DisableBlockingAndFiltering { if !args.DisableBlockingAndFiltering {
@ -384,14 +381,14 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
go func() { go func() {
err := s.UDPServer(s.Args.Port) err := s.UDPServer(s.Args.Port)
if err != nil { if err != nil {
logrus.Errorf("UDP Server (%d) failed! %v", s.Args.Port, err) log.Errorf("UDP Server (%d) failed! %v", s.Args.Port, err)
} }
}() }()
if s.Args.JSONRPCPort != 0 { if s.Args.JSONRPCPort != 0 {
go func() { go func() {
err := s.UDPServer(s.Args.JSONRPCPort) err := s.UDPServer(s.Args.JSONRPCPort)
if err != nil { if err != nil {
logrus.Errorf("UDP Server (%d) failed! %v", s.Args.JSONRPCPort, err) log.Errorf("UDP Server (%d) failed! %v", s.Args.JSONRPCPort, err)
} }
}() }()
} }
@ -593,7 +590,7 @@ func InternalResolve(urls []string, DB *db.ReadOnlyDBColumnFamily) (*pb.Outputs,
BlockedTotal: 0, //TODO BlockedTotal: 0, //TODO
} }
logrus.Warn(res) log.Warn(res)
return res, nil return res, nil
} }