WIP: Json rpc federation, search/getclaimbyid, and shutdown (#76)

* Cleanup shutdown and peers subscribe

this has errors currently, need to figure out data race

* fixed data race finally....

* getclaimbyid and search

* hook jsonrpc peers subcribe into current federation

* cleanup some peers stuff

* remove commented code

* tie into session manager in jsonrpc peers subscribe

* use int for port args

* cleanup and fix a bunch of compiler warnings

* use logrus everywhere

* cleanup test
This commit is contained in:
Jeffrey Picard 2022-12-07 11:01:36 -05:00 committed by GitHub
parent 317cdf7129
commit 711c4b4b7b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 412 additions and 131 deletions

View file

@ -670,7 +670,6 @@ func (db *ReadOnlyDBColumnFamily) Unwind() {
// Shutdown shuts down the db. // Shutdown shuts down the db.
func (db *ReadOnlyDBColumnFamily) Shutdown() { func (db *ReadOnlyDBColumnFamily) Shutdown() {
db.Grp.StopAndWait()
log.Println("Calling cleanup...") log.Println("Calling cleanup...")
db.Cleanup() db.Cleanup()
log.Println("Leaving Shutdown...") log.Println("Leaving Shutdown...")

View file

@ -958,7 +958,7 @@ func (db *ReadOnlyDBColumnFamily) GetTxMerkle(tx_hashes []chainhash.Hash) ([]TxM
} }
blockTxsCache[txHeight] = txs blockTxsCache[txHeight] = txs
} }
blockTxs, _ := blockTxsCache[txHeight] blockTxs := blockTxsCache[txHeight]
results = append(results, TxMerkle{ results = append(results, TxMerkle{
TxHash: txNumKey.TxHash, TxHash: txNumKey.TxHash,
RawTx: txVal.RawTx, RawTx: txVal.RawTx,
@ -970,6 +970,45 @@ func (db *ReadOnlyDBColumnFamily) GetTxMerkle(tx_hashes []chainhash.Hash) ([]TxM
return results, nil return results, nil
} }
func (db *ReadOnlyDBColumnFamily) GetClaimByID(claimID string) ([]*ExpandedResolveResult, []*ExpandedResolveResult, error) {
rows := make([]*ExpandedResolveResult, 0)
extras := make([]*ExpandedResolveResult, 0)
claimHash, err := hex.DecodeString(claimID)
if err != nil {
return nil, nil, err
}
stream, err := db.FsGetClaimByHash(claimHash)
if err != nil {
return nil, nil, err
}
var res = NewExpandedResolveResult()
res.Stream = &optionalResolveResultOrError{res: stream}
rows = append(rows, res)
if stream != nil && stream.ChannelHash != nil {
channel, err := db.FsGetClaimByHash(stream.ChannelHash)
if err != nil {
return nil, nil, err
}
var res = NewExpandedResolveResult()
res.Channel = &optionalResolveResultOrError{res: channel}
extras = append(extras, res)
}
if stream != nil && stream.RepostedClaimHash != nil {
repost, err := db.FsGetClaimByHash(stream.RepostedClaimHash)
if err != nil {
return nil, nil, err
}
var res = NewExpandedResolveResult()
res.Repost = &optionalResolveResultOrError{res: repost}
extras = append(extras, res)
}
return rows, extras, nil
}
func (db *ReadOnlyDBColumnFamily) GetDBState() (*prefixes.DBStateValue, error) { func (db *ReadOnlyDBColumnFamily) GetDBState() (*prefixes.DBStateValue, error) {
handle, err := db.EnsureHandle(prefixes.DBState) handle, err := db.EnsureHandle(prefixes.DBState)
if err != nil { if err != nil {

View file

@ -4,7 +4,6 @@ import (
"bytes" "bytes"
"encoding/csv" "encoding/csv"
"encoding/hex" "encoding/hex"
"log"
"os" "os"
"strings" "strings"
"testing" "testing"
@ -14,6 +13,7 @@ import (
"github.com/lbryio/herald.go/internal" "github.com/lbryio/herald.go/internal"
"github.com/lbryio/lbry.go/v3/extras/stop" "github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/linxGnu/grocksdb" "github.com/linxGnu/grocksdb"
log "github.com/sirupsen/logrus"
) )
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View file

@ -12,13 +12,13 @@ import (
"encoding/binary" "encoding/binary"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"log"
"reflect" "reflect"
"sort" "sort"
"strings" "strings"
"github.com/lbryio/herald.go/internal" "github.com/lbryio/herald.go/internal"
"github.com/lbryio/lbcd/chaincfg/chainhash" "github.com/lbryio/lbcd/chaincfg/chainhash"
log "github.com/sirupsen/logrus"
) )
const ( const (

View file

@ -6,7 +6,6 @@ import (
"encoding/csv" "encoding/csv"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"log"
"math" "math"
"math/big" "math/big"
"os" "os"
@ -16,6 +15,7 @@ import (
dbpkg "github.com/lbryio/herald.go/db" dbpkg "github.com/lbryio/herald.go/db"
prefixes "github.com/lbryio/herald.go/db/prefixes" prefixes "github.com/lbryio/herald.go/db/prefixes"
"github.com/linxGnu/grocksdb" "github.com/linxGnu/grocksdb"
log "github.com/sirupsen/logrus"
) )
func TestPrefixRegistry(t *testing.T) { func TestPrefixRegistry(t *testing.T) {

31
main.go
View file

@ -3,7 +3,6 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"strconv"
"time" "time"
_ "net/http/pprof" _ "net/http/pprof"
@ -14,6 +13,7 @@ import (
"github.com/lbryio/lbry.go/v3/extras/stop" "github.com/lbryio/lbry.go/v3/extras/stop"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
) )
func main() { func main() {
@ -29,43 +29,22 @@ func main() {
if args.CmdType == server.ServeCmd { if args.CmdType == server.ServeCmd {
// This will cancel goroutines with the server finishes. // This will cancel goroutines with the server finishes.
// ctxWCancel, cancel := context.WithCancel(ctx)
// defer cancel()
stopGroup := stop.New() stopGroup := stop.New()
// defer stopGroup.Stop()
initsignals(stopGroup.Ch()) initsignals()
interrupt := interruptListener() interrupt := interruptListener()
// s := server.MakeHubServer(ctxWCancel, args)
s := server.MakeHubServer(stopGroup, args) s := server.MakeHubServer(stopGroup, args)
go s.Run() go s.Run()
defer func() { defer s.Stop()
log.Println("Shutting down server...")
if s.EsClient != nil {
log.Println("Stopping es client...")
s.EsClient.Stop()
}
if s.GrpcServer != nil {
log.Println("Stopping grpc server...")
s.GrpcServer.GracefulStop()
}
if s.DB != nil {
log.Println("Stopping database connection...")
s.DB.Shutdown()
}
log.Println("Returning from main...")
}()
<-interrupt <-interrupt
return return
} }
conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port), conn, err := grpc.Dial("localhost:"+fmt.Sprintf("%d", args.Port),
grpc.WithInsecure(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(), grpc.WithBlock(),
) )
if err != nil { if err != nil {

View file

@ -26,9 +26,9 @@ type Args struct {
DBPath string DBPath string
Chain *string Chain *string
EsHost string EsHost string
EsPort string EsPort int
PrometheusPort string PrometheusPort int
NotifierPort string NotifierPort int
JSONRPCPort int JSONRPCPort int
JSONRPCHTTPPort int JSONRPCHTTPPort int
MaxSessions int MaxSessions int
@ -71,9 +71,9 @@ const (
DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME
DefaultEsHost = "http://localhost" DefaultEsHost = "http://localhost"
DefaultEsIndex = "claims" DefaultEsIndex = "claims"
DefaultEsPort = "9200" DefaultEsPort = 9200
DefaultPrometheusPort = "2112" DefaultPrometheusPort = 2112
DefaultNotifierPort = "18080" DefaultNotifierPort = 18080
DefaultJSONRPCPort = 50001 DefaultJSONRPCPort = 50001
DefaultJSONRPCHTTPPort = 50002 DefaultJSONRPCHTTPPort = 50002
DefaultMaxSessions = 10000 DefaultMaxSessions = 10000
@ -219,9 +219,9 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
chain := parser.Selector("", "chain", []string{chaincfg.MainNetParams.Name, chaincfg.TestNet3Params.Name, chaincfg.RegressionNetParams.Name, "testnet"}, chain := parser.Selector("", "chain", []string{chaincfg.MainNetParams.Name, chaincfg.TestNet3Params.Name, chaincfg.RegressionNetParams.Name, "testnet"},
&argparse.Options{Required: false, Help: "Which chain to use, default is 'mainnet'. Values 'regtest' and 'testnet' are for testing", Default: chaincfg.MainNetParams.Name}) &argparse.Options{Required: false, Help: "Which chain to use, default is 'mainnet'. Values 'regtest' and 'testnet' are for testing", Default: chaincfg.MainNetParams.Name})
esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: DefaultEsHost}) esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: DefaultEsHost})
esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort}) esPort := parser.Int("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort})
prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort}) prometheusPort := parser.Int("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort})
notifierPort := parser.String("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort}) notifierPort := parser.Int("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort})
jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort, Default: DefaultJSONRPCPort}) jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort, Default: DefaultJSONRPCPort})
jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort, Default: DefaultJSONRPCHTTPPort}) jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort, Default: DefaultJSONRPCHTTPPort})
maxSessions := parser.Int("", "max-sessions", &argparse.Options{Required: false, Help: "Maximum number of electrum clients that can be connected", Default: DefaultMaxSessions}) maxSessions := parser.Int("", "max-sessions", &argparse.Options{Required: false, Help: "Maximum number of electrum clients that can be connected", Default: DefaultMaxSessions})
@ -334,11 +334,17 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
} }
if esPort, ok := environment["ELASTIC_PORT"]; ok { if esPort, ok := environment["ELASTIC_PORT"]; ok {
args.EsPort = esPort args.EsPort, err = strconv.Atoi(esPort)
if err != nil {
log.Fatal(err)
}
} }
if prometheusPort, ok := environment["GOHUB_PROMETHEUS_PORT"]; ok { if prometheusPort, ok := environment["GOHUB_PROMETHEUS_PORT"]; ok {
args.PrometheusPort = prometheusPort args.PrometheusPort, err = strconv.Atoi(prometheusPort)
if err != nil {
log.Fatal(err)
}
} }
/* /*

View file

@ -3,7 +3,6 @@ package server
import ( import (
"bufio" "bufio"
"context" "context"
"log"
"math" "math"
"net" "net"
"os" "os"
@ -14,7 +13,9 @@ import (
"github.com/lbryio/herald.go/internal/metrics" "github.com/lbryio/herald.go/internal/metrics"
pb "github.com/lbryio/herald.go/protobuf/go" pb "github.com/lbryio/herald.go/protobuf/go"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
) )
// Peer holds relevant information about peers that we know about. // Peer holds relevant information about peers that we know about.
@ -99,7 +100,7 @@ retry:
time.Sleep(time.Second * time.Duration(math.Pow(float64(failures), 2))) time.Sleep(time.Second * time.Duration(math.Pow(float64(failures), 2)))
conn, err := grpc.DialContext(ctx, conn, err := grpc.DialContext(ctx,
"0.0.0.0:"+port, "0.0.0.0:"+port,
grpc.WithInsecure(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(), grpc.WithBlock(),
) )
@ -172,7 +173,7 @@ func (s *Server) subscribeToPeer(peer *Peer) error {
conn, err := grpc.DialContext(ctx, conn, err := grpc.DialContext(ctx,
peer.Address+":"+peer.Port, peer.Address+":"+peer.Port,
grpc.WithInsecure(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(), grpc.WithBlock(),
) )
if err != nil { if err != nil {
@ -208,7 +209,7 @@ func (s *Server) helloPeer(peer *Peer) (*pb.HelloMessage, error) {
conn, err := grpc.DialContext(ctx, conn, err := grpc.DialContext(ctx,
peer.Address+":"+peer.Port, peer.Address+":"+peer.Port,
grpc.WithInsecure(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(), grpc.WithBlock(),
) )
if err != nil { if err != nil {
@ -278,7 +279,7 @@ func (s *Server) notifyPeer(peerToNotify *Peer, newPeer *Peer) error {
conn, err := grpc.DialContext(ctx, conn, err := grpc.DialContext(ctx,
peerToNotify.Address+":"+peerToNotify.Port, peerToNotify.Address+":"+peerToNotify.Port,
grpc.WithInsecure(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(), grpc.WithBlock(),
) )
if err != nil { if err != nil {
@ -370,6 +371,10 @@ func (s *Server) addPeer(newPeer *Peer, ping bool, subscribe bool) error {
metrics.PeersKnown.Inc() metrics.PeersKnown.Inc()
s.writePeers() s.writePeers()
s.notifyPeerSubs(newPeer) s.notifyPeerSubs(newPeer)
// This is weird because we're doing grpc and jsonrpc here.
// Do we still want to custom grpc?
log.Warn("Sending peer to NotifierChan")
s.NotifierChan <- peerNotification{newPeer.Address, newPeer.Port}
// Subscribe to all our peers for now // Subscribe to all our peers for now
if subscribe { if subscribe {

View file

@ -4,7 +4,6 @@ import (
"bufio" "bufio"
"context" "context"
"fmt" "fmt"
"log"
"net" "net"
"os" "os"
"strconv" "strconv"
@ -16,13 +15,20 @@ import (
"github.com/lbryio/herald.go/server" "github.com/lbryio/herald.go/server"
"github.com/lbryio/lbry.go/v3/extras/stop" "github.com/lbryio/lbry.go/v3/extras/stop"
dto "github.com/prometheus/client_model/go" dto "github.com/prometheus/client_model/go"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
) )
// lineCountFile takes a fileName and counts the number of lines in it. // lineCountFile takes a fileName and counts the number of lines in it.
func lineCountFile(fileName string) int { func lineCountFile(fileName string) int {
f, err := os.Open(fileName) f, err := os.Open(fileName)
defer f.Close() defer func() {
err := f.Close()
if err != nil {
log.Warn(err)
}
}()
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return 0 return 0
@ -51,6 +57,7 @@ func TestAddPeer(t *testing.T) {
// ctx := context.Background() // ctx := context.Background()
ctx := stop.NewDebug() ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs() args := server.MakeDefaultTestArgs()
args.DisableStartNotifier = false
tests := []struct { tests := []struct {
name string name string
@ -100,6 +107,7 @@ func TestAddPeer(t *testing.T) {
if got != tt.want { if got != tt.want {
t.Errorf("len(server.PeerServers) = %d, want %d\n", got, tt.want) t.Errorf("len(server.PeerServers) = %d, want %d\n", got, tt.want)
} }
hubServer.Stop()
}) })
} }
@ -107,10 +115,10 @@ func TestAddPeer(t *testing.T) {
// TestPeerWriter tests that peers get written properly // TestPeerWriter tests that peers get written properly
func TestPeerWriter(t *testing.T) { func TestPeerWriter(t *testing.T) {
// ctx := context.Background()
ctx := stop.NewDebug() ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs() args := server.MakeDefaultTestArgs()
args.DisableWritePeers = false args.DisableWritePeers = false
args.DisableStartNotifier = false
tests := []struct { tests := []struct {
name string name string
@ -145,17 +153,16 @@ func TestPeerWriter(t *testing.T) {
Port: "50051", Port: "50051",
} }
} }
//log.Printf("Adding peer %+v\n", peer)
err := hubServer.AddPeerExported()(peer, false, false) err := hubServer.AddPeerExported()(peer, false, false)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
} }
//log.Println("Counting lines...")
got := lineCountFile(hubServer.Args.PeerFile) got := lineCountFile(hubServer.Args.PeerFile)
if got != tt.want { if got != tt.want {
t.Errorf("lineCountFile(peers.txt) = %d, want %d", got, tt.want) t.Errorf("lineCountFile(peers.txt) = %d, want %d", got, tt.want)
} }
hubServer.Stop()
}) })
} }
@ -164,11 +171,13 @@ func TestPeerWriter(t *testing.T) {
// TestAddPeerEndpoint tests the ability to add peers // TestAddPeerEndpoint tests the ability to add peers
func TestAddPeerEndpoint(t *testing.T) { func TestAddPeerEndpoint(t *testing.T) {
// ctx := context.Background()
ctx := stop.NewDebug() ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs() args := server.MakeDefaultTestArgs()
args.DisableStartNotifier = false
args2 := server.MakeDefaultTestArgs() args2 := server.MakeDefaultTestArgs()
args2.DisableStartNotifier = false
args2.Port = 50052 args2.Port = 50052
args2.NotifierPort = 18081
tests := []struct { tests := []struct {
name string name string
@ -198,9 +207,8 @@ func TestAddPeerEndpoint(t *testing.T) {
metrics.PeersKnown.Set(0) metrics.PeersKnown.Set(0)
go hubServer.Run() go hubServer.Run()
go hubServer2.Run() go hubServer2.Run()
//go hubServer.Run()
conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port), conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
grpc.WithInsecure(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(), grpc.WithBlock(),
) )
if err != nil { if err != nil {
@ -219,8 +227,6 @@ func TestAddPeerEndpoint(t *testing.T) {
log.Println(err) log.Println(err)
} }
hubServer.GrpcServer.GracefulStop()
hubServer2.GrpcServer.GracefulStop()
got1 := hubServer.GetNumPeersExported()() got1 := hubServer.GetNumPeersExported()()
got2 := hubServer2.GetNumPeersExported()() got2 := hubServer2.GetNumPeersExported()()
if got1 != tt.wantServerOne { if got1 != tt.wantServerOne {
@ -229,6 +235,8 @@ func TestAddPeerEndpoint(t *testing.T) {
if got2 != tt.wantServerTwo { if got2 != tt.wantServerTwo {
t.Errorf("len(hubServer2.PeerServers) = %d, want %d\n", got2, tt.wantServerTwo) t.Errorf("len(hubServer2.PeerServers) = %d, want %d\n", got2, tt.wantServerTwo)
} }
hubServer.Stop()
hubServer2.Stop()
}) })
} }
@ -236,13 +244,17 @@ func TestAddPeerEndpoint(t *testing.T) {
// TestAddPeerEndpoint2 tests the ability to add peers // TestAddPeerEndpoint2 tests the ability to add peers
func TestAddPeerEndpoint2(t *testing.T) { func TestAddPeerEndpoint2(t *testing.T) {
// ctx := context.Background()
ctx := stop.NewDebug() ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs() args := server.MakeDefaultTestArgs()
args2 := server.MakeDefaultTestArgs() args2 := server.MakeDefaultTestArgs()
args3 := server.MakeDefaultTestArgs() args3 := server.MakeDefaultTestArgs()
args2.Port = 50052 args2.Port = 50052
args3.Port = 50053 args3.Port = 50053
args.DisableStartNotifier = false
args2.DisableStartNotifier = false
args3.DisableStartNotifier = false
args2.NotifierPort = 18081
args3.NotifierPort = 18082
tests := []struct { tests := []struct {
name string name string
@ -268,7 +280,7 @@ func TestAddPeerEndpoint2(t *testing.T) {
go hubServer2.Run() go hubServer2.Run()
go hubServer3.Run() go hubServer3.Run()
conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port), conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
grpc.WithInsecure(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(), grpc.WithBlock(),
) )
if err != nil { if err != nil {
@ -296,9 +308,6 @@ func TestAddPeerEndpoint2(t *testing.T) {
log.Println(err) log.Println(err)
} }
hubServer.GrpcServer.GracefulStop()
hubServer2.GrpcServer.GracefulStop()
hubServer3.GrpcServer.GracefulStop()
got1 := hubServer.GetNumPeersExported()() got1 := hubServer.GetNumPeersExported()()
got2 := hubServer2.GetNumPeersExported()() got2 := hubServer2.GetNumPeersExported()()
got3 := hubServer3.GetNumPeersExported()() got3 := hubServer3.GetNumPeersExported()()
@ -311,6 +320,9 @@ func TestAddPeerEndpoint2(t *testing.T) {
if got3 != tt.wantServerThree { if got3 != tt.wantServerThree {
t.Errorf("len(hubServer3.PeerServers) = %d, want %d\n", got3, tt.wantServerThree) t.Errorf("len(hubServer3.PeerServers) = %d, want %d\n", got3, tt.wantServerThree)
} }
hubServer.Stop()
hubServer2.Stop()
hubServer3.Stop()
}) })
} }
@ -318,13 +330,17 @@ func TestAddPeerEndpoint2(t *testing.T) {
// TestAddPeerEndpoint3 tests the ability to add peers // TestAddPeerEndpoint3 tests the ability to add peers
func TestAddPeerEndpoint3(t *testing.T) { func TestAddPeerEndpoint3(t *testing.T) {
// ctx := context.Background()
ctx := stop.NewDebug() ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs() args := server.MakeDefaultTestArgs()
args2 := server.MakeDefaultTestArgs() args2 := server.MakeDefaultTestArgs()
args3 := server.MakeDefaultTestArgs() args3 := server.MakeDefaultTestArgs()
args2.Port = 50052 args2.Port = 50052
args3.Port = 50053 args3.Port = 50053
args.DisableStartNotifier = false
args2.DisableStartNotifier = false
args3.DisableStartNotifier = false
args2.NotifierPort = 18081
args3.NotifierPort = 18082
tests := []struct { tests := []struct {
name string name string
@ -350,14 +366,14 @@ func TestAddPeerEndpoint3(t *testing.T) {
go hubServer2.Run() go hubServer2.Run()
go hubServer3.Run() go hubServer3.Run()
conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port), conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
grpc.WithInsecure(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(), grpc.WithBlock(),
) )
if err != nil { if err != nil {
log.Fatalf("did not connect: %v", err) log.Fatalf("did not connect: %v", err)
} }
conn2, err := grpc.Dial("localhost:50052", conn2, err := grpc.Dial("localhost:50052",
grpc.WithInsecure(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(), grpc.WithBlock(),
) )
if err != nil { if err != nil {
@ -386,9 +402,9 @@ func TestAddPeerEndpoint3(t *testing.T) {
log.Println(err) log.Println(err)
} }
hubServer.GrpcServer.GracefulStop() hubServer.Stop()
hubServer2.GrpcServer.GracefulStop() hubServer2.Stop()
hubServer3.GrpcServer.GracefulStop() hubServer3.Stop()
got1 := hubServer.GetNumPeersExported()() got1 := hubServer.GetNumPeersExported()()
got2 := hubServer2.GetNumPeersExported()() got2 := hubServer2.GetNumPeersExported()()
got3 := hubServer3.GetNumPeersExported()() got3 := hubServer3.GetNumPeersExported()()
@ -408,12 +424,11 @@ func TestAddPeerEndpoint3(t *testing.T) {
// TestAddPeer tests the ability to add peers // TestAddPeer tests the ability to add peers
func TestUDPServer(t *testing.T) { func TestUDPServer(t *testing.T) {
// ctx := context.Background()
ctx := stop.NewDebug() ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs() args := server.MakeDefaultTestArgs()
args.DisableStartUDP = false
args2 := server.MakeDefaultTestArgs() args2 := server.MakeDefaultTestArgs()
args2.Port = 50052 args2.Port = 50052
args.DisableStartUDP = false
args2.DisableStartUDP = false args2.DisableStartUDP = false
tests := []struct { tests := []struct {
@ -444,8 +459,8 @@ func TestUDPServer(t *testing.T) {
log.Println(err) log.Println(err)
} }
hubServer.GrpcServer.GracefulStop() hubServer.Stop()
hubServer2.GrpcServer.GracefulStop() hubServer2.Stop()
got1 := hubServer.ExternalIP.String() got1 := hubServer.ExternalIP.String()
if got1 != tt.want { if got1 != tt.want {

View file

@ -185,6 +185,9 @@ func TestHeaders(t *testing.T) {
} }
var resp *BlockHeadersResp var resp *BlockHeadersResp
err := s.Headers(&req, &resp) err := s.Headers(&req, &resp)
if err != nil {
t.Errorf("Headers: %v", err)
}
marshalled, err := json.MarshalIndent(resp, "", " ") marshalled, err := json.MarshalIndent(resp, "", " ")
if err != nil { if err != nil {
t.Errorf("height: %v unmarshal err: %v", height, err) t.Errorf("height: %v unmarshal err: %v", height, err)
@ -204,7 +207,7 @@ func TestHeadersSubscribe(t *testing.T) {
return return
} }
sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams) sm := newSessionManager(nil, db, args, grp, &chaincfg.RegressionNetParams)
sm.start() sm.start()
defer sm.stop() defer sm.stop()
@ -385,7 +388,7 @@ func TestAddressSubscribe(t *testing.T) {
return return
} }
sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams) sm := newSessionManager(nil, db, args, grp, &chaincfg.RegressionNetParams)
sm.start() sm.start()
defer sm.stop() defer sm.stop()

View file

@ -1,13 +1,19 @@
package server package server
import ( import (
"context"
"fmt"
"github.com/lbryio/herald.go/db" "github.com/lbryio/herald.go/db"
"github.com/lbryio/herald.go/internal/metrics"
pb "github.com/lbryio/herald.go/protobuf/go" pb "github.com/lbryio/herald.go/protobuf/go"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
type ClaimtrieService struct { type ClaimtrieService struct {
DB *db.ReadOnlyDBColumnFamily DB *db.ReadOnlyDBColumnFamily
Server *Server
} }
type ResolveData struct { type ResolveData struct {
@ -18,6 +24,10 @@ type Result struct {
Data string `json:"data"` Data string `json:"data"`
} }
type GetClaimByIDData struct {
ClaimID string `json:"claim_id"`
}
// Resolve is the json rpc endpoint for 'blockchain.claimtrie.resolve'. // Resolve is the json rpc endpoint for 'blockchain.claimtrie.resolve'.
func (t *ClaimtrieService) Resolve(args *ResolveData, result **pb.Outputs) error { func (t *ClaimtrieService) Resolve(args *ResolveData, result **pb.Outputs) error {
log.Println("Resolve") log.Println("Resolve")
@ -25,3 +35,74 @@ func (t *ClaimtrieService) Resolve(args *ResolveData, result **pb.Outputs) error
*result = res *result = res
return err return err
} }
// Search is the json rpc endpoint for 'blockchain.claimtrie.search'.
func (t *ClaimtrieService) Search(args *pb.SearchRequest, result **pb.Outputs) error {
log.Println("Search")
if t.Server == nil {
log.Warnln("Server is nil in Search")
*result = nil
return nil
}
ctx := context.Background()
res, err := t.Server.Search(ctx, args)
*result = res
return err
}
// GetClaimByID is the json rpc endpoint for 'blockchain.claimtrie.getclaimbyid'.
func (t *ClaimtrieService) GetClaimByID(args *GetClaimByIDData, result **pb.Outputs) error {
log.Println("GetClaimByID")
if len(args.ClaimID) != 40 {
*result = nil
return fmt.Errorf("len(claim_id) != 40")
}
rows, extras, err := t.DB.GetClaimByID(args.ClaimID)
if err != nil {
*result = nil
return err
}
metrics.RequestsCount.With(prometheus.Labels{"method": "blockchain.claimtrie.getclaimbyid"}).Inc()
// FIXME: this has txos and extras and so does GetClaimById?
allTxos := make([]*pb.Output, 0)
allExtraTxos := make([]*pb.Output, 0)
for _, row := range rows {
txos, extraTxos, err := row.ToOutputs()
if err != nil {
*result = nil
return err
}
// TODO: there may be a more efficient way to do this.
allTxos = append(allTxos, txos...)
allExtraTxos = append(allExtraTxos, extraTxos...)
}
for _, extra := range extras {
txos, extraTxos, err := extra.ToOutputs()
if err != nil {
*result = nil
return err
}
// TODO: there may be a more efficient way to do this.
allTxos = append(allTxos, txos...)
allExtraTxos = append(allExtraTxos, extraTxos...)
}
res := &pb.Outputs{
Txos: allTxos,
ExtraTxos: allExtraTxos,
Total: uint32(len(allTxos) + len(allExtraTxos)),
Offset: 0, //TODO
Blocked: nil, //TODO
BlockedTotal: 0, //TODO
}
log.Warn(res)
*result = res
return nil
}

View file

@ -0,0 +1,39 @@
package server
import (
"errors"
log "github.com/sirupsen/logrus"
)
type PeersService struct {
Server *Server
// needed for subscribe/unsubscribe
sessionMgr *sessionManager
session *session
}
type PeersSubscribeReq struct {
Ip string `json:"ip"`
Host string `json:"host"`
Details []string `json:"details"`
}
type PeersSubscribeResp string
// Features is the json rpc endpoint for 'server.peers.subcribe'.
func (t *PeersService) Subscribe(req *PeersSubscribeReq, res **PeersSubscribeResp) error {
log.Println("PeersSubscribe")
// var port = "50001"
// FIXME: Get the actual port from the request details
if t.sessionMgr == nil || t.session == nil {
*res = nil
return errors.New("no session, rpc not supported")
}
t.sessionMgr.peersSubscribe(t.session, true /*subscribe*/)
*res = nil
return nil
}

View file

@ -91,7 +91,7 @@ fail1:
s1.RegisterCodec(&gorillaRpcCodec{gorilla_json.NewCodec()}, "application/json") s1.RegisterCodec(&gorillaRpcCodec{gorilla_json.NewCodec()}, "application/json")
// Register "blockchain.claimtrie.*"" handlers. // Register "blockchain.claimtrie.*"" handlers.
claimtrieSvc := &ClaimtrieService{s.DB} claimtrieSvc := &ClaimtrieService{s.DB, s}
err := s1.RegisterTCPService(claimtrieSvc, "blockchain_claimtrie") err := s1.RegisterTCPService(claimtrieSvc, "blockchain_claimtrie")
if err != nil { if err != nil {
log.Errorf("RegisterTCPService: %v\n", err) log.Errorf("RegisterTCPService: %v\n", err)
@ -134,6 +134,14 @@ fail1:
goto fail2 goto fail2
} }
// Register "server.peers" handlers.
peersSvc := &PeersService{Server: s}
err = s1.RegisterTCPService(peersSvc, "server_peers")
if err != nil {
log.Errorf("RegisterTCPService: %v\n", err)
goto fail2
}
r := gorilla_mux.NewRouter() r := gorilla_mux.NewRouter()
r.Handle("/rpc", s1) r.Handle("/rpc", s1)
port := ":" + strconv.FormatUint(uint64(s.Args.JSONRPCHTTPPort), 10) port := ":" + strconv.FormatUint(uint64(s.Args.JSONRPCHTTPPort), 10)

View file

@ -2,6 +2,7 @@ package server
import ( import (
"encoding/binary" "encoding/binary"
"fmt"
"net" "net"
"github.com/lbryio/herald.go/internal" "github.com/lbryio/herald.go/internal"
@ -53,10 +54,16 @@ func (s *Server) DoNotify(heightHash *internal.HeightHash) error {
// RunNotifier Runs the notfying action forever // RunNotifier Runs the notfying action forever
func (s *Server) RunNotifier() error { func (s *Server) RunNotifier() error {
for notification := range s.NotifierChan { for notification := range s.NotifierChan {
switch notification.(type) { switch note := notification.(type) {
case internal.HeightHash: case internal.HeightHash:
heightHash, _ := notification.(internal.HeightHash) heightHash := note
s.DoNotify(&heightHash) s.DoNotify(&heightHash)
// Do we need this?
// case peerNotification:
// peer, _ := notification.(peerNotification)
// s.notifyPeerSubs(&Peer{Address: peer.address, Port: peer.port})
default:
logrus.Warn("unknown notification type")
} }
s.sessionManager.doNotify(notification) s.sessionManager.doNotify(notification)
} }
@ -65,7 +72,8 @@ func (s *Server) RunNotifier() error {
// NotifierServer implements the TCP protocol for height/blockheader notifications // NotifierServer implements the TCP protocol for height/blockheader notifications
func (s *Server) NotifierServer() error { func (s *Server) NotifierServer() error {
address := ":" + s.Args.NotifierPort s.Grp.Add(1)
address := ":" + fmt.Sprintf("%d", s.Args.NotifierPort)
addr, err := net.ResolveTCPAddr("tcp", address) addr, err := net.ResolveTCPAddr("tcp", address)
if err != nil { if err != nil {
return err return err
@ -77,11 +85,27 @@ func (s *Server) NotifierServer() error {
} }
defer listen.Close() defer listen.Close()
rdyCh := make(chan bool)
for { for {
var conn net.Conn
var err error
logrus.Info("Waiting for connection") logrus.Info("Waiting for connection")
conn, err := listen.Accept()
go func() {
conn, err = listen.Accept()
rdyCh <- true
}()
select {
case <-s.Grp.Ch():
s.Grp.Done()
return nil
case <-rdyCh:
logrus.Info("Connection accepted")
}
if err != nil { if err != nil {
logrus.Warn(err) logrus.Warn(err)
continue continue

View file

@ -15,6 +15,26 @@ import (
const defaultBufferSize = 1024 const defaultBufferSize = 1024
func subReady(s *server.Server) error {
sleepTime := time.Millisecond * 100
for {
if sleepTime > time.Second {
return fmt.Errorf("timeout")
}
s.HeightSubsMut.RLock()
if len(s.HeightSubs) > 0 {
s.HeightSubsMut.RUnlock()
return nil
}
s.HeightSubsMut.RUnlock()
logrus.Warn("waiting for subscriber")
time.Sleep(sleepTime)
sleepTime = sleepTime * 2
}
}
func tcpConnReady(addr string) (net.Conn, error) { func tcpConnReady(addr string) (net.Conn, error) {
sleepTime := time.Millisecond * 100 sleepTime := time.Millisecond * 100
for { for {
@ -48,14 +68,13 @@ func tcpRead(conn net.Conn) ([]byte, error) {
func TestNotifierServer(t *testing.T) { func TestNotifierServer(t *testing.T) {
args := server.MakeDefaultTestArgs() args := server.MakeDefaultTestArgs()
// ctx := context.Background()
ctx := stop.NewDebug() ctx := stop.NewDebug()
hub := server.MakeHubServer(ctx, args) hub := server.MakeHubServer(ctx, args)
go hub.NotifierServer() go hub.NotifierServer()
go hub.RunNotifier() go hub.RunNotifier()
addr := fmt.Sprintf(":%s", args.NotifierPort) addr := fmt.Sprintf(":%d", args.NotifierPort)
logrus.Info(addr) logrus.Info(addr)
conn, err := tcpConnReady(addr) conn, err := tcpConnReady(addr)
if err != nil { if err != nil {
@ -77,7 +96,10 @@ func TestNotifierServer(t *testing.T) {
// Hacky but needed because if the reader isn't ready // Hacky but needed because if the reader isn't ready
// before the writer sends it won't get the data // before the writer sends it won't get the data
time.Sleep(time.Second) err = subReady(hub)
if err != nil {
t.Fatal(err)
}
hash, _ := hex.DecodeString("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") hash, _ := hex.DecodeString("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
logrus.Warn("sending hash") logrus.Warn("sending hash")

View file

@ -7,8 +7,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"hash" "hash"
"io/ioutil" golog "log"
"log"
"net" "net"
"net/http" "net/http"
"os" "os"
@ -27,7 +26,7 @@ import (
"github.com/olivere/elastic/v7" "github.com/olivere/elastic/v7"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
logrus "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/reflection" "google.golang.org/grpc/reflection"
) )
@ -56,6 +55,7 @@ type Server struct {
HeightSubsMut sync.RWMutex HeightSubsMut sync.RWMutex
NotifierChan chan interface{} NotifierChan chan interface{}
Grp *stop.Group Grp *stop.Group
notiferListener *net.TCPListener
sessionManager *sessionManager sessionManager *sessionManager
pb.UnimplementedHubServer pb.UnimplementedHubServer
} }
@ -153,29 +153,50 @@ func (s *Server) Run() {
} }
} }
func (s *Server) Stop() {
log.Println("Shutting down server...")
if s.EsClient != nil {
log.Println("Stopping es client...")
s.EsClient.Stop()
}
if s.GrpcServer != nil {
log.Println("Stopping grpc server...")
s.GrpcServer.GracefulStop()
}
log.Println("Stopping other server threads...")
s.Grp.StopAndWait()
if s.DB != nil {
log.Println("Stopping database connection...")
s.DB.Shutdown()
}
log.Println("Returning from Stop...")
}
func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, error) { func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, error) {
tmpName, err := ioutil.TempDir("", "go-lbry-hub") tmpName, err := os.MkdirTemp("", "go-lbry-hub")
if err != nil { if err != nil {
logrus.Info(err) log.Info(err)
log.Fatal(err) log.Fatal(err)
} }
logrus.Info("tmpName", tmpName) log.Info("tmpName", tmpName)
if err != nil { if err != nil {
logrus.Info(err) log.Info(err)
} }
myDB, err := db.GetProdDB(args.DBPath, tmpName, grp) myDB, err := db.GetProdDB(args.DBPath, tmpName, grp)
if err != nil { if err != nil {
// Can't load the db, fail loudly // Can't load the db, fail loudly
logrus.Info(err) log.Info(err)
log.Fatalln(err) log.Fatalln(err)
} }
if myDB.LastState != nil { if myDB.LastState != nil {
logrus.Infof("DB version: %v", myDB.LastState.DBVersion) log.Infof("DB version: %v", myDB.LastState.DBVersion)
logrus.Infof("height: %v", myDB.LastState.Height) log.Infof("height: %v", myDB.LastState.Height)
logrus.Infof("genesis: %v", myDB.LastState.Genesis.String()) log.Infof("genesis: %v", myDB.LastState.Genesis.String())
logrus.Infof("tip: %v", myDB.LastState.Tip.String()) log.Infof("tip: %v", myDB.LastState.Tip.String())
logrus.Infof("tx count: %v", myDB.LastState.TxCount) log.Infof("tx count: %v", myDB.LastState.TxCount)
} }
blockingChannelHashes := make([][]byte, 0, 10) blockingChannelHashes := make([][]byte, 0, 10)
@ -186,7 +207,7 @@ func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, erro
for _, id := range args.BlockingChannelIds { for _, id := range args.BlockingChannelIds {
hash, err := hex.DecodeString(id) hash, err := hex.DecodeString(id)
if err != nil { if err != nil {
logrus.Warn("Invalid channel id: ", id) log.Warn("Invalid channel id: ", id)
continue continue
} }
blockingChannelHashes = append(blockingChannelHashes, hash) blockingChannelHashes = append(blockingChannelHashes, hash)
@ -196,7 +217,7 @@ func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, erro
for _, id := range args.FilteringChannelIds { for _, id := range args.FilteringChannelIds {
hash, err := hex.DecodeString(id) hash, err := hex.DecodeString(id)
if err != nil { if err != nil {
logrus.Warn("Invalid channel id: ", id) log.Warn("Invalid channel id: ", id)
continue continue
} }
filteringChannelHashes = append(filteringChannelHashes, hash) filteringChannelHashes = append(filteringChannelHashes, hash)
@ -207,10 +228,10 @@ func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, erro
myDB.FilteringChannelHashes = filteringChannelHashes myDB.FilteringChannelHashes = filteringChannelHashes
if len(filteringIds) > 0 { if len(filteringIds) > 0 {
logrus.Infof("filtering claims reposted by channels: %+s", filteringIds) log.Infof("filtering claims reposted by channels: %+s", filteringIds)
} }
if len(blockingIds) > 0 { if len(blockingIds) > 0 {
logrus.Infof("blocking claims reposted by channels: %+s", blockingIds) log.Infof("blocking claims reposted by channels: %+s", blockingIds)
} }
return myDB, nil return myDB, nil
@ -234,7 +255,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
var client *elastic.Client = nil var client *elastic.Client = nil
if !args.DisableEs { if !args.DisableEs {
esUrl := args.EsHost + ":" + args.EsPort esUrl := args.EsHost + ":" + fmt.Sprintf("%d", args.EsPort)
opts := []elastic.ClientOptionFunc{ opts := []elastic.ClientOptionFunc{
elastic.SetSniff(true), elastic.SetSniff(true),
elastic.SetSnifferTimeoutStartup(time.Second * 60), elastic.SetSnifferTimeoutStartup(time.Second * 60),
@ -242,7 +263,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
elastic.SetURL(esUrl), elastic.SetURL(esUrl),
} }
if args.Debug { if args.Debug {
opts = append(opts, elastic.SetTraceLog(log.New(os.Stderr, "[[ELASTIC]]", 0))) opts = append(opts, elastic.SetTraceLog(golog.New(os.Stderr, "[[ELASTIC]]", 0)))
} }
client, err = elastic.NewClient(opts...) client, err = elastic.NewClient(opts...)
if err != nil { if err != nil {
@ -271,7 +292,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
if !args.DisableResolve { if !args.DisableResolve {
myDB, err = LoadDatabase(args, grp) myDB, err = LoadDatabase(args, grp)
if err != nil { if err != nil {
logrus.Warning(err) log.Warning(err)
} }
} }
@ -302,7 +323,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
chain := chaincfg.MainNetParams chain := chaincfg.MainNetParams
if dbChain != nil && cliChain != nil { if dbChain != nil && cliChain != nil {
if dbChain != cliChain { if dbChain != cliChain {
logrus.Warnf("network: %v (from db) conflicts with %v (from cli)", dbChain.Name, cliChain.Name) log.Warnf("network: %v (from db) conflicts with %v (from cli)", dbChain.Name, cliChain.Name)
} }
chain = *dbChain chain = *dbChain
} else if dbChain != nil { } else if dbChain != nil {
@ -310,7 +331,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
} else if cliChain != nil { } else if cliChain != nil {
chain = *cliChain chain = *cliChain
} }
logrus.Infof("network: %v", chain.Name) log.Infof("network: %v", chain.Name)
args.GenesisHash = chain.GenesisHash.String() args.GenesisHash = chain.GenesisHash.String()
@ -338,34 +359,36 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
ExternalIP: net.IPv4(127, 0, 0, 1), ExternalIP: net.IPv4(127, 0, 0, 1),
HeightSubs: make(map[net.Addr]net.Conn), HeightSubs: make(map[net.Addr]net.Conn),
HeightSubsMut: sync.RWMutex{}, HeightSubsMut: sync.RWMutex{},
NotifierChan: make(chan interface{}), NotifierChan: make(chan interface{}, 1),
Grp: grp, Grp: grp,
sessionManager: newSessionManager(myDB, args, sessionGrp, &chain), sessionManager: nil,
} }
// FIXME: HACK
s.sessionManager = newSessionManager(s, myDB, args, sessionGrp, &chain)
// Start up our background services // Start up our background services
if !args.DisableResolve && !args.DisableRocksDBRefresh { if !args.DisableResolve && !args.DisableRocksDBRefresh {
logrus.Info("Running detect changes") log.Info("Running detect changes")
myDB.RunDetectChanges(s.NotifierChan) myDB.RunDetectChanges(s.NotifierChan)
} }
if !args.DisableBlockingAndFiltering { if !args.DisableBlockingAndFiltering {
myDB.RunGetBlocksAndFilters() myDB.RunGetBlocksAndFilters()
} }
if !args.DisableStartPrometheus { if !args.DisableStartPrometheus {
go s.prometheusEndpoint(s.Args.PrometheusPort, "metrics") go s.prometheusEndpoint(fmt.Sprintf("%d", s.Args.PrometheusPort), "metrics")
} }
if !args.DisableStartUDP { if !args.DisableStartUDP {
go func() { go func() {
err := s.UDPServer(s.Args.Port) err := s.UDPServer(s.Args.Port)
if err != nil { if err != nil {
logrus.Errorf("UDP Server (%d) failed! %v", s.Args.Port, err) log.Errorf("UDP Server (%d) failed! %v", s.Args.Port, err)
} }
}() }()
if s.Args.JSONRPCPort != 0 { if s.Args.JSONRPCPort != 0 {
go func() { go func() {
err := s.UDPServer(s.Args.JSONRPCPort) err := s.UDPServer(s.Args.JSONRPCPort)
if err != nil { if err != nil {
logrus.Errorf("UDP Server (%d) failed! %v", s.Args.JSONRPCPort, err) log.Errorf("UDP Server (%d) failed! %v", s.Args.JSONRPCPort, err)
} }
}() }()
} }
@ -409,7 +432,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
// for this hub to allow for metric tracking. // for this hub to allow for metric tracking.
func (s *Server) prometheusEndpoint(port string, endpoint string) { func (s *Server) prometheusEndpoint(port string, endpoint string) {
http.Handle("/"+endpoint, promhttp.Handler()) http.Handle("/"+endpoint, promhttp.Handler())
log.Println(fmt.Sprintf("listening on :%s /%s", port, endpoint)) log.Printf("listening on :%s /%s\n", port, endpoint)
err := http.ListenAndServe(":"+port, nil) err := http.ListenAndServe(":"+port, nil)
log.Fatalln("Shouldn't happen??!?!", err) log.Fatalln("Shouldn't happen??!?!", err)
} }
@ -567,7 +590,7 @@ func InternalResolve(urls []string, DB *db.ReadOnlyDBColumnFamily) (*pb.Outputs,
BlockedTotal: 0, //TODO BlockedTotal: 0, //TODO
} }
logrus.Warn(res) log.Warn(res)
return res, nil return res, nil
} }

View file

@ -14,6 +14,6 @@ func (s *Server) GetNumPeersExported() func() int64 {
return s.getNumPeers return s.getNumPeers
} }
func NewSessionManagerExported(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager { func NewSessionManagerExported(server *Server, db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
return newSessionManager(db, args, grp, chain) return newSessionManager(server, db, args, grp, chain)
} }

View file

@ -33,6 +33,11 @@ type hashXNotification struct {
statusStr string statusStr string
} }
type peerNotification struct {
address string
port string
}
type session struct { type session struct {
id uintptr id uintptr
addr net.Addr addr net.Addr
@ -41,6 +46,8 @@ type session struct {
hashXSubs map[[HASHX_LEN]byte]string hashXSubs map[[HASHX_LEN]byte]string
// headersSub indicates header subscription // headersSub indicates header subscription
headersSub bool headersSub bool
// peersSub indicates peer subscription
peersSub bool
// headersSubRaw indicates the header subscription mode // headersSubRaw indicates the header subscription mode
headersSubRaw bool headersSubRaw bool
// client provides the ability to send notifications // client provides the ability to send notifications
@ -55,12 +62,11 @@ type session struct {
func (s *session) doNotify(notification interface{}) { func (s *session) doNotify(notification interface{}) {
var method string var method string
var params interface{} var params interface{}
switch notification.(type) { switch note := notification.(type) {
case headerNotification: case headerNotification:
if !s.headersSub { if !s.headersSub {
return return
} }
note, _ := notification.(headerNotification)
heightHash := note.HeightHash heightHash := note.HeightHash
method = "blockchain.headers.subscribe" method = "blockchain.headers.subscribe"
if s.headersSubRaw { if s.headersSubRaw {
@ -80,7 +86,6 @@ func (s *session) doNotify(notification interface{}) {
params = header params = header
} }
case hashXNotification: case hashXNotification:
note, _ := notification.(hashXNotification)
orig, ok := s.hashXSubs[note.hashX] orig, ok := s.hashXSubs[note.hashX]
if !ok { if !ok {
return return
@ -95,6 +100,13 @@ func (s *session) doNotify(notification interface{}) {
status = hex.EncodeToString(note.status) status = hex.EncodeToString(note.status)
} }
params = []string{orig, status} params = []string{orig, status}
case peerNotification:
if !s.peersSub {
return
}
method = "server.peers.subscribe"
params = []string{note.address, note.port}
default: default:
log.Warnf("unknown notification type: %v", notification) log.Warnf("unknown notification type: %v", notification)
return return
@ -126,14 +138,17 @@ type sessionManager struct {
manageTicker *time.Ticker manageTicker *time.Ticker
db *db.ReadOnlyDBColumnFamily db *db.ReadOnlyDBColumnFamily
args *Args args *Args
server *Server
chain *chaincfg.Params chain *chaincfg.Params
// peerSubs are sessions subscribed via 'blockchain.peers.subscribe'
peerSubs sessionMap
// headerSubs are sessions subscribed via 'blockchain.headers.subscribe' // headerSubs are sessions subscribed via 'blockchain.headers.subscribe'
headerSubs sessionMap headerSubs sessionMap
// hashXSubs are sessions subscribed via 'blockchain.{address,scripthash}.subscribe' // hashXSubs are sessions subscribed via 'blockchain.{address,scripthash}.subscribe'
hashXSubs map[[HASHX_LEN]byte]sessionMap hashXSubs map[[HASHX_LEN]byte]sessionMap
} }
func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager { func newSessionManager(server *Server, db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
return &sessionManager{ return &sessionManager{
sessions: make(sessionMap), sessions: make(sessionMap),
grp: grp, grp: grp,
@ -142,7 +157,9 @@ func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Grou
manageTicker: time.NewTicker(time.Duration(max(5, args.SessionTimeout/20)) * time.Second), manageTicker: time.NewTicker(time.Duration(max(5, args.SessionTimeout/20)) * time.Second),
db: db, db: db,
args: args, args: args,
server: server,
chain: chain, chain: chain,
peerSubs: make(sessionMap),
headerSubs: make(sessionMap), headerSubs: make(sessionMap),
hashXSubs: make(map[[HASHX_LEN]byte]sessionMap), hashXSubs: make(map[[HASHX_LEN]byte]sessionMap),
} }
@ -211,8 +228,15 @@ func (sm *sessionManager) addSession(conn net.Conn) *session {
log.Errorf("RegisterName: %v\n", err) log.Errorf("RegisterName: %v\n", err)
} }
// Register "server.peers" handlers.
peersSvc := &PeersService{Server: sm.server}
err = s1.RegisterName("server.peers", peersSvc)
if err != nil {
log.Errorf("RegisterName: %v\n", err)
}
// Register "blockchain.claimtrie.*"" handlers. // Register "blockchain.claimtrie.*"" handlers.
claimtrieSvc := &ClaimtrieService{sm.db} claimtrieSvc := &ClaimtrieService{sm.db, sm.server}
err = s1.RegisterName("blockchain.claimtrie", claimtrieSvc) err = s1.RegisterName("blockchain.claimtrie", claimtrieSvc)
if err != nil { if err != nil {
log.Errorf("RegisterName: %v\n", err) log.Errorf("RegisterName: %v\n", err)
@ -286,6 +310,18 @@ func (sm *sessionManager) broadcastTx(rawTx []byte) (*chainhash.Hash, error) {
return nil, nil return nil, nil
} }
func (sm *sessionManager) peersSubscribe(sess *session, subscribe bool) {
sm.sessionsMut.Lock()
defer sm.sessionsMut.Unlock()
if subscribe {
sm.peerSubs[sess.id] = sess
sess.peersSub = true
return
}
delete(sm.peerSubs, sess.id)
sess.peersSub = false
}
func (sm *sessionManager) headersSubscribe(sess *session, raw bool, subscribe bool) { func (sm *sessionManager) headersSubscribe(sess *session, raw bool, subscribe bool) {
sm.sessionsMut.Lock() sm.sessionsMut.Lock()
defer sm.sessionsMut.Unlock() defer sm.sessionsMut.Unlock()
@ -325,16 +361,15 @@ func (sm *sessionManager) hashXSubscribe(sess *session, hashX []byte, original s
} }
func (sm *sessionManager) doNotify(notification interface{}) { func (sm *sessionManager) doNotify(notification interface{}) {
switch notification.(type) { switch note := notification.(type) {
case internal.HeightHash: case internal.HeightHash:
// The HeightHash notification translates to headerNotification. // The HeightHash notification translates to headerNotification.
notification = &headerNotification{HeightHash: notification.(internal.HeightHash)} notification = &headerNotification{HeightHash: note}
} }
sm.sessionsMut.RLock() sm.sessionsMut.RLock()
var subsCopy sessionMap var subsCopy sessionMap
switch notification.(type) { switch note := notification.(type) {
case headerNotification: case headerNotification:
note, _ := notification.(headerNotification)
subsCopy = sm.headerSubs subsCopy = sm.headerSubs
if len(subsCopy) > 0 { if len(subsCopy) > 0 {
hdr := [HEADER_SIZE]byte{} hdr := [HEADER_SIZE]byte{}
@ -343,7 +378,6 @@ func (sm *sessionManager) doNotify(notification interface{}) {
note.blockHeaderStr = hex.EncodeToString(note.BlockHeader[:]) note.blockHeaderStr = hex.EncodeToString(note.BlockHeader[:])
} }
case hashXNotification: case hashXNotification:
note, _ := notification.(hashXNotification)
hashXSubs, ok := sm.hashXSubs[note.hashX] hashXSubs, ok := sm.hashXSubs[note.hashX]
if ok { if ok {
subsCopy = hashXSubs subsCopy = hashXSubs
@ -351,6 +385,8 @@ func (sm *sessionManager) doNotify(notification interface{}) {
if len(subsCopy) > 0 { if len(subsCopy) > 0 {
note.statusStr = hex.EncodeToString(note.status) note.statusStr = hex.EncodeToString(note.status)
} }
case peerNotification:
subsCopy = sm.peerSubs
default: default:
log.Warnf("unknown notification type: %v", notification) log.Warnf("unknown notification type: %v", notification)
} }
@ -369,8 +405,10 @@ type sessionServerCodec struct {
// ReadRequestHeader provides ability to rewrite the incoming // ReadRequestHeader provides ability to rewrite the incoming
// request "method" field. For example: // request "method" field. For example:
// blockchain.block.get_header -> blockchain.block.Get_header //
// blockchain.address.listunspent -> blockchain.address.Listunspent // blockchain.block.get_header -> blockchain.block.Get_header
// blockchain.address.listunspent -> blockchain.address.Listunspent
//
// This makes the "method" string compatible with rpc.Server // This makes the "method" string compatible with rpc.Server
// requirements. // requirements.
func (c *sessionServerCodec) ReadRequestHeader(req *rpc.Request) error { func (c *sessionServerCodec) ReadRequestHeader(req *rpc.Request) error {

View file

@ -49,6 +49,9 @@ func interruptListener() <-chan struct{} {
case sig := <-interruptChannel: case sig := <-interruptChannel:
log.Infof("Received signal (%s). Already "+ log.Infof("Received signal (%s). Already "+
"shutting down...", sig) "shutting down...", sig)
case <-shutdownRequestChannel:
log.Info("Shutdown requested. Already " +
"shutting down...")
} }
} }
}() }()

View file

@ -10,12 +10,9 @@ package main
import ( import (
"os" "os"
"syscall" "syscall"
"github.com/lbryio/lbry.go/v3/extras/stop"
) )
// initsignals sets the signals to be caught by the signal handler // initsignals sets the signals to be caught by the signal handler
func initsignals(stopCh stop.Chan) { func initsignals() {
shutdownRequestChannel = stopCh
interruptSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} interruptSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
} }