WIP: Json rpc federation, search/getclaimbyid, and shutdown #76
1
db/db.go
|
@ -670,7 +670,6 @@ func (db *ReadOnlyDBColumnFamily) Unwind() {
|
||||||
|
|
||||||
// Shutdown shuts down the db.
|
// Shutdown shuts down the db.
|
||||||
func (db *ReadOnlyDBColumnFamily) Shutdown() {
|
func (db *ReadOnlyDBColumnFamily) Shutdown() {
|
||||||
db.Grp.StopAndWait()
|
|
||||||
log.Println("Calling cleanup...")
|
log.Println("Calling cleanup...")
|
||||||
db.Cleanup()
|
db.Cleanup()
|
||||||
log.Println("Leaving Shutdown...")
|
log.Println("Leaving Shutdown...")
|
||||||
|
|
41
db/db_get.go
|
@ -958,7 +958,7 @@ func (db *ReadOnlyDBColumnFamily) GetTxMerkle(tx_hashes []chainhash.Hash) ([]TxM
|
||||||
}
|
}
|
||||||
blockTxsCache[txHeight] = txs
|
blockTxsCache[txHeight] = txs
|
||||||
}
|
}
|
||||||
blockTxs, _ := blockTxsCache[txHeight]
|
blockTxs := blockTxsCache[txHeight]
|
||||||
results = append(results, TxMerkle{
|
results = append(results, TxMerkle{
|
||||||
TxHash: txNumKey.TxHash,
|
TxHash: txNumKey.TxHash,
|
||||||
RawTx: txVal.RawTx,
|
RawTx: txVal.RawTx,
|
||||||
|
@ -970,6 +970,45 @@ func (db *ReadOnlyDBColumnFamily) GetTxMerkle(tx_hashes []chainhash.Hash) ([]TxM
|
||||||
return results, nil
|
return results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *ReadOnlyDBColumnFamily) GetClaimByID(claimID string) ([]*ExpandedResolveResult, []*ExpandedResolveResult, error) {
|
||||||
|
rows := make([]*ExpandedResolveResult, 0)
|
||||||
|
extras := make([]*ExpandedResolveResult, 0)
|
||||||
|
claimHash, err := hex.DecodeString(claimID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
stream, err := db.FsGetClaimByHash(claimHash)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
var res = NewExpandedResolveResult()
|
||||||
|
res.Stream = &optionalResolveResultOrError{res: stream}
|
||||||
|
rows = append(rows, res)
|
||||||
|
|
||||||
|
if stream != nil && stream.ChannelHash != nil {
|
||||||
|
channel, err := db.FsGetClaimByHash(stream.ChannelHash)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
var res = NewExpandedResolveResult()
|
||||||
|
res.Channel = &optionalResolveResultOrError{res: channel}
|
||||||
|
extras = append(extras, res)
|
||||||
|
}
|
||||||
|
|
||||||
|
if stream != nil && stream.RepostedClaimHash != nil {
|
||||||
|
repost, err := db.FsGetClaimByHash(stream.RepostedClaimHash)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
var res = NewExpandedResolveResult()
|
||||||
|
res.Repost = &optionalResolveResultOrError{res: repost}
|
||||||
|
extras = append(extras, res)
|
||||||
|
}
|
||||||
|
|
||||||
|
return rows, extras, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (db *ReadOnlyDBColumnFamily) GetDBState() (*prefixes.DBStateValue, error) {
|
func (db *ReadOnlyDBColumnFamily) GetDBState() (*prefixes.DBStateValue, error) {
|
||||||
handle, err := db.EnsureHandle(prefixes.DBState)
|
handle, err := db.EnsureHandle(prefixes.DBState)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/csv"
|
"encoding/csv"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"log"
|
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -14,6 +13,7 @@ import (
|
||||||
"github.com/lbryio/herald.go/internal"
|
"github.com/lbryio/herald.go/internal"
|
||||||
"github.com/lbryio/lbry.go/v3/extras/stop"
|
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||||
"github.com/linxGnu/grocksdb"
|
"github.com/linxGnu/grocksdb"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
|
@ -12,13 +12,13 @@ import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/lbryio/herald.go/internal"
|
"github.com/lbryio/herald.go/internal"
|
||||||
"github.com/lbryio/lbcd/chaincfg/chainhash"
|
"github.com/lbryio/lbcd/chaincfg/chainhash"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"encoding/csv"
|
"encoding/csv"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"math"
|
"math"
|
||||||
"math/big"
|
"math/big"
|
||||||
"os"
|
"os"
|
||||||
|
@ -16,6 +15,7 @@ import (
|
||||||
dbpkg "github.com/lbryio/herald.go/db"
|
dbpkg "github.com/lbryio/herald.go/db"
|
||||||
prefixes "github.com/lbryio/herald.go/db/prefixes"
|
prefixes "github.com/lbryio/herald.go/db/prefixes"
|
||||||
"github.com/linxGnu/grocksdb"
|
"github.com/linxGnu/grocksdb"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestPrefixRegistry(t *testing.T) {
|
func TestPrefixRegistry(t *testing.T) {
|
||||||
|
|
31
main.go
|
@ -3,7 +3,6 @@ package main
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
_ "net/http/pprof"
|
_ "net/http/pprof"
|
||||||
|
@ -14,6 +13,7 @@ import (
|
||||||
"github.com/lbryio/lbry.go/v3/extras/stop"
|
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
@ -29,43 +29,22 @@ func main() {
|
||||||
|
|
||||||
if args.CmdType == server.ServeCmd {
|
if args.CmdType == server.ServeCmd {
|
||||||
// This will cancel goroutines with the server finishes.
|
// This will cancel goroutines with the server finishes.
|
||||||
// ctxWCancel, cancel := context.WithCancel(ctx)
|
|
||||||
// defer cancel()
|
|
||||||
stopGroup := stop.New()
|
stopGroup := stop.New()
|
||||||
// defer stopGroup.Stop()
|
|
||||||
|
|
||||||
initsignals(stopGroup.Ch())
|
initsignals()
|
||||||
interrupt := interruptListener()
|
interrupt := interruptListener()
|
||||||
|
|
||||||
// s := server.MakeHubServer(ctxWCancel, args)
|
|
||||||
s := server.MakeHubServer(stopGroup, args)
|
s := server.MakeHubServer(stopGroup, args)
|
||||||
go s.Run()
|
go s.Run()
|
||||||
|
|
||||||
defer func() {
|
defer s.Stop()
|
||||||
log.Println("Shutting down server...")
|
|
||||||
|
|
||||||
if s.EsClient != nil {
|
|
||||||
log.Println("Stopping es client...")
|
|
||||||
s.EsClient.Stop()
|
|
||||||
}
|
|
||||||
if s.GrpcServer != nil {
|
|
||||||
log.Println("Stopping grpc server...")
|
|
||||||
s.GrpcServer.GracefulStop()
|
|
||||||
}
|
|
||||||
if s.DB != nil {
|
|
||||||
log.Println("Stopping database connection...")
|
|
||||||
s.DB.Shutdown()
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Println("Returning from main...")
|
|
||||||
}()
|
|
||||||
|
|
||||||
<-interrupt
|
<-interrupt
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
|
conn, err := grpc.Dial("localhost:"+fmt.Sprintf("%d", args.Port),
|
||||||
grpc.WithInsecure(),
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||||
grpc.WithBlock(),
|
grpc.WithBlock(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -26,9 +26,9 @@ type Args struct {
|
||||||
DBPath string
|
DBPath string
|
||||||
Chain *string
|
Chain *string
|
||||||
EsHost string
|
EsHost string
|
||||||
EsPort string
|
EsPort int
|
||||||
PrometheusPort string
|
PrometheusPort int
|
||||||
NotifierPort string
|
NotifierPort int
|
||||||
JSONRPCPort int
|
JSONRPCPort int
|
||||||
JSONRPCHTTPPort int
|
JSONRPCHTTPPort int
|
||||||
MaxSessions int
|
MaxSessions int
|
||||||
|
@ -71,9 +71,9 @@ const (
|
||||||
DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME
|
DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME
|
||||||
DefaultEsHost = "http://localhost"
|
DefaultEsHost = "http://localhost"
|
||||||
DefaultEsIndex = "claims"
|
DefaultEsIndex = "claims"
|
||||||
DefaultEsPort = "9200"
|
DefaultEsPort = 9200
|
||||||
DefaultPrometheusPort = "2112"
|
DefaultPrometheusPort = 2112
|
||||||
DefaultNotifierPort = "18080"
|
DefaultNotifierPort = 18080
|
||||||
DefaultJSONRPCPort = 50001
|
DefaultJSONRPCPort = 50001
|
||||||
DefaultJSONRPCHTTPPort = 50002
|
DefaultJSONRPCHTTPPort = 50002
|
||||||
DefaultMaxSessions = 10000
|
DefaultMaxSessions = 10000
|
||||||
|
@ -219,9 +219,9 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
||||||
chain := parser.Selector("", "chain", []string{chaincfg.MainNetParams.Name, chaincfg.TestNet3Params.Name, chaincfg.RegressionNetParams.Name, "testnet"},
|
chain := parser.Selector("", "chain", []string{chaincfg.MainNetParams.Name, chaincfg.TestNet3Params.Name, chaincfg.RegressionNetParams.Name, "testnet"},
|
||||||
&argparse.Options{Required: false, Help: "Which chain to use, default is 'mainnet'. Values 'regtest' and 'testnet' are for testing", Default: chaincfg.MainNetParams.Name})
|
&argparse.Options{Required: false, Help: "Which chain to use, default is 'mainnet'. Values 'regtest' and 'testnet' are for testing", Default: chaincfg.MainNetParams.Name})
|
||||||
esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: DefaultEsHost})
|
esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: DefaultEsHost})
|
||||||
esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort})
|
esPort := parser.Int("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort})
|
||||||
prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort})
|
prometheusPort := parser.Int("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort})
|
||||||
notifierPort := parser.String("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort})
|
notifierPort := parser.Int("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort})
|
||||||
jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort, Default: DefaultJSONRPCPort})
|
jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort, Default: DefaultJSONRPCPort})
|
||||||
jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort, Default: DefaultJSONRPCHTTPPort})
|
jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort, Default: DefaultJSONRPCHTTPPort})
|
||||||
maxSessions := parser.Int("", "max-sessions", &argparse.Options{Required: false, Help: "Maximum number of electrum clients that can be connected", Default: DefaultMaxSessions})
|
maxSessions := parser.Int("", "max-sessions", &argparse.Options{Required: false, Help: "Maximum number of electrum clients that can be connected", Default: DefaultMaxSessions})
|
||||||
|
@ -334,11 +334,17 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
||||||
}
|
}
|
||||||
|
|
||||||
if esPort, ok := environment["ELASTIC_PORT"]; ok {
|
if esPort, ok := environment["ELASTIC_PORT"]; ok {
|
||||||
args.EsPort = esPort
|
args.EsPort, err = strconv.Atoi(esPort)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if prometheusPort, ok := environment["GOHUB_PROMETHEUS_PORT"]; ok {
|
if prometheusPort, ok := environment["GOHUB_PROMETHEUS_PORT"]; ok {
|
||||||
args.PrometheusPort = prometheusPort
|
args.PrometheusPort, err = strconv.Atoi(prometheusPort)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -3,7 +3,6 @@ package server
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"context"
|
"context"
|
||||||
"log"
|
|
||||||
"math"
|
"math"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
|
@ -14,7 +13,9 @@ import (
|
||||||
|
|
||||||
"github.com/lbryio/herald.go/internal/metrics"
|
"github.com/lbryio/herald.go/internal/metrics"
|
||||||
pb "github.com/lbryio/herald.go/protobuf/go"
|
pb "github.com/lbryio/herald.go/protobuf/go"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Peer holds relevant information about peers that we know about.
|
// Peer holds relevant information about peers that we know about.
|
||||||
|
@ -99,7 +100,7 @@ retry:
|
||||||
time.Sleep(time.Second * time.Duration(math.Pow(float64(failures), 2)))
|
time.Sleep(time.Second * time.Duration(math.Pow(float64(failures), 2)))
|
||||||
conn, err := grpc.DialContext(ctx,
|
conn, err := grpc.DialContext(ctx,
|
||||||
"0.0.0.0:"+port,
|
"0.0.0.0:"+port,
|
||||||
grpc.WithInsecure(),
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||||
grpc.WithBlock(),
|
grpc.WithBlock(),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -172,7 +173,7 @@ func (s *Server) subscribeToPeer(peer *Peer) error {
|
||||||
|
|
||||||
conn, err := grpc.DialContext(ctx,
|
conn, err := grpc.DialContext(ctx,
|
||||||
peer.Address+":"+peer.Port,
|
peer.Address+":"+peer.Port,
|
||||||
grpc.WithInsecure(),
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||||
grpc.WithBlock(),
|
grpc.WithBlock(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -208,7 +209,7 @@ func (s *Server) helloPeer(peer *Peer) (*pb.HelloMessage, error) {
|
||||||
|
|
||||||
conn, err := grpc.DialContext(ctx,
|
conn, err := grpc.DialContext(ctx,
|
||||||
peer.Address+":"+peer.Port,
|
peer.Address+":"+peer.Port,
|
||||||
grpc.WithInsecure(),
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||||
grpc.WithBlock(),
|
grpc.WithBlock(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -278,7 +279,7 @@ func (s *Server) notifyPeer(peerToNotify *Peer, newPeer *Peer) error {
|
||||||
|
|
||||||
conn, err := grpc.DialContext(ctx,
|
conn, err := grpc.DialContext(ctx,
|
||||||
peerToNotify.Address+":"+peerToNotify.Port,
|
peerToNotify.Address+":"+peerToNotify.Port,
|
||||||
grpc.WithInsecure(),
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||||
grpc.WithBlock(),
|
grpc.WithBlock(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -370,6 +371,10 @@ func (s *Server) addPeer(newPeer *Peer, ping bool, subscribe bool) error {
|
||||||
metrics.PeersKnown.Inc()
|
metrics.PeersKnown.Inc()
|
||||||
s.writePeers()
|
s.writePeers()
|
||||||
s.notifyPeerSubs(newPeer)
|
s.notifyPeerSubs(newPeer)
|
||||||
|
// This is weird because we're doing grpc and jsonrpc here.
|
||||||
|
// Do we still want to custom grpc?
|
||||||
|
log.Warn("Sending peer to NotifierChan")
|
||||||
|
s.NotifierChan <- peerNotification{newPeer.Address, newPeer.Port}
|
||||||
|
|
||||||
// Subscribe to all our peers for now
|
// Subscribe to all our peers for now
|
||||||
if subscribe {
|
if subscribe {
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -16,13 +15,20 @@ import (
|
||||||
"github.com/lbryio/herald.go/server"
|
"github.com/lbryio/herald.go/server"
|
||||||
"github.com/lbryio/lbry.go/v3/extras/stop"
|
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||||
dto "github.com/prometheus/client_model/go"
|
dto "github.com/prometheus/client_model/go"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
)
|
)
|
||||||
|
|
||||||
// lineCountFile takes a fileName and counts the number of lines in it.
|
// lineCountFile takes a fileName and counts the number of lines in it.
|
||||||
func lineCountFile(fileName string) int {
|
func lineCountFile(fileName string) int {
|
||||||
f, err := os.Open(fileName)
|
f, err := os.Open(fileName)
|
||||||
defer f.Close()
|
defer func() {
|
||||||
|
err := f.Close()
|
||||||
|
if err != nil {
|
||||||
|
log.Warn(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
return 0
|
return 0
|
||||||
|
@ -51,6 +57,7 @@ func TestAddPeer(t *testing.T) {
|
||||||
// ctx := context.Background()
|
// ctx := context.Background()
|
||||||
ctx := stop.NewDebug()
|
ctx := stop.NewDebug()
|
||||||
args := server.MakeDefaultTestArgs()
|
args := server.MakeDefaultTestArgs()
|
||||||
|
args.DisableStartNotifier = false
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
@ -100,6 +107,7 @@ func TestAddPeer(t *testing.T) {
|
||||||
if got != tt.want {
|
if got != tt.want {
|
||||||
t.Errorf("len(server.PeerServers) = %d, want %d\n", got, tt.want)
|
t.Errorf("len(server.PeerServers) = %d, want %d\n", got, tt.want)
|
||||||
}
|
}
|
||||||
|
hubServer.Stop()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,10 +115,10 @@ func TestAddPeer(t *testing.T) {
|
||||||
|
|
||||||
// TestPeerWriter tests that peers get written properly
|
// TestPeerWriter tests that peers get written properly
|
||||||
func TestPeerWriter(t *testing.T) {
|
func TestPeerWriter(t *testing.T) {
|
||||||
// ctx := context.Background()
|
|
||||||
ctx := stop.NewDebug()
|
ctx := stop.NewDebug()
|
||||||
args := server.MakeDefaultTestArgs()
|
args := server.MakeDefaultTestArgs()
|
||||||
args.DisableWritePeers = false
|
args.DisableWritePeers = false
|
||||||
|
args.DisableStartNotifier = false
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
@ -145,17 +153,16 @@ func TestPeerWriter(t *testing.T) {
|
||||||
Port: "50051",
|
Port: "50051",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//log.Printf("Adding peer %+v\n", peer)
|
|
||||||
err := hubServer.AddPeerExported()(peer, false, false)
|
err := hubServer.AddPeerExported()(peer, false, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//log.Println("Counting lines...")
|
|
||||||
got := lineCountFile(hubServer.Args.PeerFile)
|
got := lineCountFile(hubServer.Args.PeerFile)
|
||||||
if got != tt.want {
|
if got != tt.want {
|
||||||
t.Errorf("lineCountFile(peers.txt) = %d, want %d", got, tt.want)
|
t.Errorf("lineCountFile(peers.txt) = %d, want %d", got, tt.want)
|
||||||
}
|
}
|
||||||
|
hubServer.Stop()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,11 +171,13 @@ func TestPeerWriter(t *testing.T) {
|
||||||
|
|
||||||
// TestAddPeerEndpoint tests the ability to add peers
|
// TestAddPeerEndpoint tests the ability to add peers
|
||||||
func TestAddPeerEndpoint(t *testing.T) {
|
func TestAddPeerEndpoint(t *testing.T) {
|
||||||
// ctx := context.Background()
|
|
||||||
ctx := stop.NewDebug()
|
ctx := stop.NewDebug()
|
||||||
args := server.MakeDefaultTestArgs()
|
args := server.MakeDefaultTestArgs()
|
||||||
|
args.DisableStartNotifier = false
|
||||||
args2 := server.MakeDefaultTestArgs()
|
args2 := server.MakeDefaultTestArgs()
|
||||||
|
args2.DisableStartNotifier = false
|
||||||
args2.Port = 50052
|
args2.Port = 50052
|
||||||
|
args2.NotifierPort = 18081
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
@ -198,9 +207,8 @@ func TestAddPeerEndpoint(t *testing.T) {
|
||||||
metrics.PeersKnown.Set(0)
|
metrics.PeersKnown.Set(0)
|
||||||
go hubServer.Run()
|
go hubServer.Run()
|
||||||
go hubServer2.Run()
|
go hubServer2.Run()
|
||||||
//go hubServer.Run()
|
|
||||||
conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
|
conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
|
||||||
grpc.WithInsecure(),
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||||
grpc.WithBlock(),
|
grpc.WithBlock(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -219,8 +227,6 @@ func TestAddPeerEndpoint(t *testing.T) {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
hubServer.GrpcServer.GracefulStop()
|
|
||||||
hubServer2.GrpcServer.GracefulStop()
|
|
||||||
got1 := hubServer.GetNumPeersExported()()
|
got1 := hubServer.GetNumPeersExported()()
|
||||||
got2 := hubServer2.GetNumPeersExported()()
|
got2 := hubServer2.GetNumPeersExported()()
|
||||||
if got1 != tt.wantServerOne {
|
if got1 != tt.wantServerOne {
|
||||||
|
@ -229,6 +235,8 @@ func TestAddPeerEndpoint(t *testing.T) {
|
||||||
if got2 != tt.wantServerTwo {
|
if got2 != tt.wantServerTwo {
|
||||||
t.Errorf("len(hubServer2.PeerServers) = %d, want %d\n", got2, tt.wantServerTwo)
|
t.Errorf("len(hubServer2.PeerServers) = %d, want %d\n", got2, tt.wantServerTwo)
|
||||||
}
|
}
|
||||||
|
hubServer.Stop()
|
||||||
|
hubServer2.Stop()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -236,13 +244,17 @@ func TestAddPeerEndpoint(t *testing.T) {
|
||||||
|
|
||||||
// TestAddPeerEndpoint2 tests the ability to add peers
|
// TestAddPeerEndpoint2 tests the ability to add peers
|
||||||
func TestAddPeerEndpoint2(t *testing.T) {
|
func TestAddPeerEndpoint2(t *testing.T) {
|
||||||
// ctx := context.Background()
|
|
||||||
ctx := stop.NewDebug()
|
ctx := stop.NewDebug()
|
||||||
args := server.MakeDefaultTestArgs()
|
args := server.MakeDefaultTestArgs()
|
||||||
args2 := server.MakeDefaultTestArgs()
|
args2 := server.MakeDefaultTestArgs()
|
||||||
args3 := server.MakeDefaultTestArgs()
|
args3 := server.MakeDefaultTestArgs()
|
||||||
args2.Port = 50052
|
args2.Port = 50052
|
||||||
args3.Port = 50053
|
args3.Port = 50053
|
||||||
|
args.DisableStartNotifier = false
|
||||||
|
args2.DisableStartNotifier = false
|
||||||
|
args3.DisableStartNotifier = false
|
||||||
|
args2.NotifierPort = 18081
|
||||||
|
args3.NotifierPort = 18082
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
@ -268,7 +280,7 @@ func TestAddPeerEndpoint2(t *testing.T) {
|
||||||
go hubServer2.Run()
|
go hubServer2.Run()
|
||||||
go hubServer3.Run()
|
go hubServer3.Run()
|
||||||
conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
|
conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
|
||||||
grpc.WithInsecure(),
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||||
grpc.WithBlock(),
|
grpc.WithBlock(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -296,9 +308,6 @@ func TestAddPeerEndpoint2(t *testing.T) {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
hubServer.GrpcServer.GracefulStop()
|
|
||||||
hubServer2.GrpcServer.GracefulStop()
|
|
||||||
hubServer3.GrpcServer.GracefulStop()
|
|
||||||
got1 := hubServer.GetNumPeersExported()()
|
got1 := hubServer.GetNumPeersExported()()
|
||||||
got2 := hubServer2.GetNumPeersExported()()
|
got2 := hubServer2.GetNumPeersExported()()
|
||||||
got3 := hubServer3.GetNumPeersExported()()
|
got3 := hubServer3.GetNumPeersExported()()
|
||||||
|
@ -311,6 +320,9 @@ func TestAddPeerEndpoint2(t *testing.T) {
|
||||||
if got3 != tt.wantServerThree {
|
if got3 != tt.wantServerThree {
|
||||||
t.Errorf("len(hubServer3.PeerServers) = %d, want %d\n", got3, tt.wantServerThree)
|
t.Errorf("len(hubServer3.PeerServers) = %d, want %d\n", got3, tt.wantServerThree)
|
||||||
}
|
}
|
||||||
|
hubServer.Stop()
|
||||||
|
hubServer2.Stop()
|
||||||
|
hubServer3.Stop()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -318,13 +330,17 @@ func TestAddPeerEndpoint2(t *testing.T) {
|
||||||
|
|
||||||
// TestAddPeerEndpoint3 tests the ability to add peers
|
// TestAddPeerEndpoint3 tests the ability to add peers
|
||||||
func TestAddPeerEndpoint3(t *testing.T) {
|
func TestAddPeerEndpoint3(t *testing.T) {
|
||||||
// ctx := context.Background()
|
|
||||||
ctx := stop.NewDebug()
|
ctx := stop.NewDebug()
|
||||||
args := server.MakeDefaultTestArgs()
|
args := server.MakeDefaultTestArgs()
|
||||||
args2 := server.MakeDefaultTestArgs()
|
args2 := server.MakeDefaultTestArgs()
|
||||||
args3 := server.MakeDefaultTestArgs()
|
args3 := server.MakeDefaultTestArgs()
|
||||||
args2.Port = 50052
|
args2.Port = 50052
|
||||||
args3.Port = 50053
|
args3.Port = 50053
|
||||||
|
args.DisableStartNotifier = false
|
||||||
|
args2.DisableStartNotifier = false
|
||||||
|
args3.DisableStartNotifier = false
|
||||||
|
args2.NotifierPort = 18081
|
||||||
|
args3.NotifierPort = 18082
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
@ -350,14 +366,14 @@ func TestAddPeerEndpoint3(t *testing.T) {
|
||||||
go hubServer2.Run()
|
go hubServer2.Run()
|
||||||
go hubServer3.Run()
|
go hubServer3.Run()
|
||||||
conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
|
conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
|
||||||
grpc.WithInsecure(),
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||||
grpc.WithBlock(),
|
grpc.WithBlock(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("did not connect: %v", err)
|
log.Fatalf("did not connect: %v", err)
|
||||||
}
|
}
|
||||||
conn2, err := grpc.Dial("localhost:50052",
|
conn2, err := grpc.Dial("localhost:50052",
|
||||||
grpc.WithInsecure(),
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||||
grpc.WithBlock(),
|
grpc.WithBlock(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -386,9 +402,9 @@ func TestAddPeerEndpoint3(t *testing.T) {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
hubServer.GrpcServer.GracefulStop()
|
hubServer.Stop()
|
||||||
hubServer2.GrpcServer.GracefulStop()
|
hubServer2.Stop()
|
||||||
hubServer3.GrpcServer.GracefulStop()
|
hubServer3.Stop()
|
||||||
got1 := hubServer.GetNumPeersExported()()
|
got1 := hubServer.GetNumPeersExported()()
|
||||||
got2 := hubServer2.GetNumPeersExported()()
|
got2 := hubServer2.GetNumPeersExported()()
|
||||||
got3 := hubServer3.GetNumPeersExported()()
|
got3 := hubServer3.GetNumPeersExported()()
|
||||||
|
@ -408,12 +424,11 @@ func TestAddPeerEndpoint3(t *testing.T) {
|
||||||
|
|
||||||
// TestAddPeer tests the ability to add peers
|
// TestAddPeer tests the ability to add peers
|
||||||
func TestUDPServer(t *testing.T) {
|
func TestUDPServer(t *testing.T) {
|
||||||
// ctx := context.Background()
|
|
||||||
ctx := stop.NewDebug()
|
ctx := stop.NewDebug()
|
||||||
args := server.MakeDefaultTestArgs()
|
args := server.MakeDefaultTestArgs()
|
||||||
args.DisableStartUDP = false
|
|
||||||
args2 := server.MakeDefaultTestArgs()
|
args2 := server.MakeDefaultTestArgs()
|
||||||
args2.Port = 50052
|
args2.Port = 50052
|
||||||
|
args.DisableStartUDP = false
|
||||||
args2.DisableStartUDP = false
|
args2.DisableStartUDP = false
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
|
@ -444,8 +459,8 @@ func TestUDPServer(t *testing.T) {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
hubServer.GrpcServer.GracefulStop()
|
hubServer.Stop()
|
||||||
hubServer2.GrpcServer.GracefulStop()
|
hubServer2.Stop()
|
||||||
|
|
||||||
got1 := hubServer.ExternalIP.String()
|
got1 := hubServer.ExternalIP.String()
|
||||||
if got1 != tt.want {
|
if got1 != tt.want {
|
||||||
|
|
|
@ -185,6 +185,9 @@ func TestHeaders(t *testing.T) {
|
||||||
}
|
}
|
||||||
var resp *BlockHeadersResp
|
var resp *BlockHeadersResp
|
||||||
err := s.Headers(&req, &resp)
|
err := s.Headers(&req, &resp)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Headers: %v", err)
|
||||||
|
}
|
||||||
marshalled, err := json.MarshalIndent(resp, "", " ")
|
marshalled, err := json.MarshalIndent(resp, "", " ")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("height: %v unmarshal err: %v", height, err)
|
t.Errorf("height: %v unmarshal err: %v", height, err)
|
||||||
|
@ -204,7 +207,7 @@ func TestHeadersSubscribe(t *testing.T) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams)
|
sm := newSessionManager(nil, db, args, grp, &chaincfg.RegressionNetParams)
|
||||||
sm.start()
|
sm.start()
|
||||||
defer sm.stop()
|
defer sm.stop()
|
||||||
|
|
||||||
|
@ -385,7 +388,7 @@ func TestAddressSubscribe(t *testing.T) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams)
|
sm := newSessionManager(nil, db, args, grp, &chaincfg.RegressionNetParams)
|
||||||
sm.start()
|
sm.start()
|
||||||
defer sm.stop()
|
defer sm.stop()
|
||||||
|
|
||||||
|
|
|
@ -1,13 +1,19 @@
|
||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/lbryio/herald.go/db"
|
"github.com/lbryio/herald.go/db"
|
||||||
|
"github.com/lbryio/herald.go/internal/metrics"
|
||||||
pb "github.com/lbryio/herald.go/protobuf/go"
|
pb "github.com/lbryio/herald.go/protobuf/go"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ClaimtrieService struct {
|
type ClaimtrieService struct {
|
||||||
DB *db.ReadOnlyDBColumnFamily
|
DB *db.ReadOnlyDBColumnFamily
|
||||||
|
Server *Server
|
||||||
}
|
}
|
||||||
|
|
||||||
type ResolveData struct {
|
type ResolveData struct {
|
||||||
|
@ -18,6 +24,10 @@ type Result struct {
|
||||||
Data string `json:"data"`
|
Data string `json:"data"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type GetClaimByIDData struct {
|
||||||
|
ClaimID string `json:"claim_id"`
|
||||||
|
}
|
||||||
|
|
||||||
// Resolve is the json rpc endpoint for 'blockchain.claimtrie.resolve'.
|
// Resolve is the json rpc endpoint for 'blockchain.claimtrie.resolve'.
|
||||||
func (t *ClaimtrieService) Resolve(args *ResolveData, result **pb.Outputs) error {
|
func (t *ClaimtrieService) Resolve(args *ResolveData, result **pb.Outputs) error {
|
||||||
log.Println("Resolve")
|
log.Println("Resolve")
|
||||||
|
@ -25,3 +35,74 @@ func (t *ClaimtrieService) Resolve(args *ResolveData, result **pb.Outputs) error
|
||||||
*result = res
|
*result = res
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Search is the json rpc endpoint for 'blockchain.claimtrie.search'.
|
||||||
|
func (t *ClaimtrieService) Search(args *pb.SearchRequest, result **pb.Outputs) error {
|
||||||
|
log.Println("Search")
|
||||||
|
if t.Server == nil {
|
||||||
|
log.Warnln("Server is nil in Search")
|
||||||
|
*result = nil
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
ctx := context.Background()
|
||||||
|
res, err := t.Server.Search(ctx, args)
|
||||||
|
*result = res
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetClaimByID is the json rpc endpoint for 'blockchain.claimtrie.getclaimbyid'.
|
||||||
|
func (t *ClaimtrieService) GetClaimByID(args *GetClaimByIDData, result **pb.Outputs) error {
|
||||||
|
log.Println("GetClaimByID")
|
||||||
|
if len(args.ClaimID) != 40 {
|
||||||
|
*result = nil
|
||||||
|
return fmt.Errorf("len(claim_id) != 40")
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, extras, err := t.DB.GetClaimByID(args.ClaimID)
|
||||||
|
if err != nil {
|
||||||
|
*result = nil
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics.RequestsCount.With(prometheus.Labels{"method": "blockchain.claimtrie.getclaimbyid"}).Inc()
|
||||||
|
|
||||||
|
// FIXME: this has txos and extras and so does GetClaimById?
|
||||||
|
allTxos := make([]*pb.Output, 0)
|
||||||
Yup, this definitely needs to be refactored in the future. Yup, this definitely needs to be refactored in the future.
|
|||||||
|
allExtraTxos := make([]*pb.Output, 0)
|
||||||
|
|
||||||
|
for _, row := range rows {
|
||||||
|
txos, extraTxos, err := row.ToOutputs()
|
||||||
|
if err != nil {
|
||||||
|
*result = nil
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// TODO: there may be a more efficient way to do this.
|
||||||
|
allTxos = append(allTxos, txos...)
|
||||||
|
allExtraTxos = append(allExtraTxos, extraTxos...)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, extra := range extras {
|
||||||
|
txos, extraTxos, err := extra.ToOutputs()
|
||||||
|
if err != nil {
|
||||||
|
*result = nil
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// TODO: there may be a more efficient way to do this.
|
||||||
|
allTxos = append(allTxos, txos...)
|
||||||
|
allExtraTxos = append(allExtraTxos, extraTxos...)
|
||||||
|
}
|
||||||
|
|
||||||
|
res := &pb.Outputs{
|
||||||
|
Txos: allTxos,
|
||||||
|
ExtraTxos: allExtraTxos,
|
||||||
|
Total: uint32(len(allTxos) + len(allExtraTxos)),
|
||||||
|
Offset: 0, //TODO
|
||||||
|
Blocked: nil, //TODO
|
||||||
|
BlockedTotal: 0, //TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Warn(res)
|
||||||
|
|
||||||
|
*result = res
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
39
server/jsonrpc_federation.go
Normal file
|
@ -0,0 +1,39 @@
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
package server
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
import (
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
"errors"
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
log "github.com/sirupsen/logrus"
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
)
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
type PeersService struct {
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
Server *Server
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
// needed for subscribe/unsubscribe
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
sessionMgr *sessionManager
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
session *session
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
}
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
type PeersSubscribeReq struct {
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
Ip string `json:"ip"`
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
Host string `json:"host"`
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
Details []string `json:"details"`
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
}
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
type PeersSubscribeResp string
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
// Features is the json rpc endpoint for 'server.peers.subcribe'.
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
func (t *PeersService) Subscribe(req *PeersSubscribeReq, res **PeersSubscribeResp) error {
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
log.Println("PeersSubscribe")
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
// var port = "50001"
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
// FIXME: Get the actual port from the request details
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
if t.sessionMgr == nil || t.session == nil {
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
*res = nil
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
return errors.New("no session, rpc not supported")
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
}
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
t.sessionMgr.peersSubscribe(t.session, true /*subscribe*/)
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
*res = nil
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
return nil
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|||||||
|
}
|
||||||
Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
Ahh.... I meant register the subscription in the The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism.
The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
|
|
@ -91,7 +91,7 @@ fail1:
|
||||||
s1.RegisterCodec(&gorillaRpcCodec{gorilla_json.NewCodec()}, "application/json")
|
s1.RegisterCodec(&gorillaRpcCodec{gorilla_json.NewCodec()}, "application/json")
|
||||||
|
|
||||||
// Register "blockchain.claimtrie.*"" handlers.
|
// Register "blockchain.claimtrie.*"" handlers.
|
||||||
claimtrieSvc := &ClaimtrieService{s.DB}
|
claimtrieSvc := &ClaimtrieService{s.DB, s}
|
||||||
err := s1.RegisterTCPService(claimtrieSvc, "blockchain_claimtrie")
|
err := s1.RegisterTCPService(claimtrieSvc, "blockchain_claimtrie")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("RegisterTCPService: %v\n", err)
|
log.Errorf("RegisterTCPService: %v\n", err)
|
||||||
|
@ -134,6 +134,14 @@ fail1:
|
||||||
goto fail2
|
goto fail2
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Register "server.peers" handlers.
|
||||||
|
peersSvc := &PeersService{Server: s}
|
||||||
|
err = s1.RegisterTCPService(peersSvc, "server_peers")
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("RegisterTCPService: %v\n", err)
|
||||||
|
goto fail2
|
||||||
|
}
|
||||||
|
|
||||||
r := gorilla_mux.NewRouter()
|
r := gorilla_mux.NewRouter()
|
||||||
r.Handle("/rpc", s1)
|
r.Handle("/rpc", s1)
|
||||||
port := ":" + strconv.FormatUint(uint64(s.Args.JSONRPCHTTPPort), 10)
|
port := ":" + strconv.FormatUint(uint64(s.Args.JSONRPCHTTPPort), 10)
|
||||||
|
|
|
@ -2,6 +2,7 @@ package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
"github.com/lbryio/herald.go/internal"
|
"github.com/lbryio/herald.go/internal"
|
||||||
|
@ -53,10 +54,16 @@ func (s *Server) DoNotify(heightHash *internal.HeightHash) error {
|
||||||
// RunNotifier Runs the notfying action forever
|
// RunNotifier Runs the notfying action forever
|
||||||
func (s *Server) RunNotifier() error {
|
func (s *Server) RunNotifier() error {
|
||||||
for notification := range s.NotifierChan {
|
for notification := range s.NotifierChan {
|
||||||
switch notification.(type) {
|
switch note := notification.(type) {
|
||||||
case internal.HeightHash:
|
case internal.HeightHash:
|
||||||
heightHash, _ := notification.(internal.HeightHash)
|
heightHash := note
|
||||||
s.DoNotify(&heightHash)
|
s.DoNotify(&heightHash)
|
||||||
|
// Do we need this?
|
||||||
|
// case peerNotification:
|
||||||
|
// peer, _ := notification.(peerNotification)
|
||||||
|
// s.notifyPeerSubs(&Peer{Address: peer.address, Port: peer.port})
|
||||||
|
default:
|
||||||
|
logrus.Warn("unknown notification type")
|
||||||
}
|
}
|
||||||
s.sessionManager.doNotify(notification)
|
s.sessionManager.doNotify(notification)
|
||||||
}
|
}
|
||||||
|
@ -65,7 +72,8 @@ func (s *Server) RunNotifier() error {
|
||||||
|
|
||||||
// NotifierServer implements the TCP protocol for height/blockheader notifications
|
// NotifierServer implements the TCP protocol for height/blockheader notifications
|
||||||
func (s *Server) NotifierServer() error {
|
func (s *Server) NotifierServer() error {
|
||||||
address := ":" + s.Args.NotifierPort
|
s.Grp.Add(1)
|
||||||
|
address := ":" + fmt.Sprintf("%d", s.Args.NotifierPort)
|
||||||
addr, err := net.ResolveTCPAddr("tcp", address)
|
addr, err := net.ResolveTCPAddr("tcp", address)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
Probably need Probably need `peerNotification` case here.
Maybe. We need to refactor this whole section of code, but I think that ends up getting handled by the catch-all Maybe. We need to refactor this whole section of code, but I think that ends up getting handled by the catch-all `s.sessionManager.doNotify`
Never mind... I have been reading the code more, and NotifierChan is only populated with HeightHash notifications coming from observing the DB. Never mind... I have been reading the code more, and NotifierChan is only populated with HeightHash notifications coming from observing the DB.
I send it the new peers in federation.go I send it the new peers in federation.go `addPeer`. So I'm pretty sure this does get notified of new peers.
|
|||||||
|
@ -77,11 +85,27 @@ func (s *Server) NotifierServer() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
defer listen.Close()
|
defer listen.Close()
|
||||||
|
rdyCh := make(chan bool)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
var conn net.Conn
|
||||||
|
var err error
|
||||||
|
|
||||||
logrus.Info("Waiting for connection")
|
logrus.Info("Waiting for connection")
|
||||||
conn, err := listen.Accept()
|
|
||||||
|
go func() {
|
||||||
|
conn, err = listen.Accept()
|
||||||
|
rdyCh <- true
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-s.Grp.Ch():
|
||||||
|
s.Grp.Done()
|
||||||
|
return nil
|
||||||
|
case <-rdyCh:
|
||||||
|
logrus.Info("Connection accepted")
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Warn(err)
|
logrus.Warn(err)
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -15,6 +15,26 @@ import (
|
||||||
|
|
||||||
const defaultBufferSize = 1024
|
const defaultBufferSize = 1024
|
||||||
|
|
||||||
|
func subReady(s *server.Server) error {
|
||||||
|
sleepTime := time.Millisecond * 100
|
||||||
|
for {
|
||||||
|
if sleepTime > time.Second {
|
||||||
|
return fmt.Errorf("timeout")
|
||||||
|
}
|
||||||
|
s.HeightSubsMut.RLock()
|
||||||
|
if len(s.HeightSubs) > 0 {
|
||||||
|
s.HeightSubsMut.RUnlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
s.HeightSubsMut.RUnlock()
|
||||||
|
|
||||||
|
logrus.Warn("waiting for subscriber")
|
||||||
|
time.Sleep(sleepTime)
|
||||||
|
sleepTime = sleepTime * 2
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func tcpConnReady(addr string) (net.Conn, error) {
|
func tcpConnReady(addr string) (net.Conn, error) {
|
||||||
sleepTime := time.Millisecond * 100
|
sleepTime := time.Millisecond * 100
|
||||||
for {
|
for {
|
||||||
|
@ -48,14 +68,13 @@ func tcpRead(conn net.Conn) ([]byte, error) {
|
||||||
|
|
||||||
func TestNotifierServer(t *testing.T) {
|
func TestNotifierServer(t *testing.T) {
|
||||||
args := server.MakeDefaultTestArgs()
|
args := server.MakeDefaultTestArgs()
|
||||||
// ctx := context.Background()
|
|
||||||
ctx := stop.NewDebug()
|
ctx := stop.NewDebug()
|
||||||
hub := server.MakeHubServer(ctx, args)
|
hub := server.MakeHubServer(ctx, args)
|
||||||
|
|
||||||
go hub.NotifierServer()
|
go hub.NotifierServer()
|
||||||
go hub.RunNotifier()
|
go hub.RunNotifier()
|
||||||
|
|
||||||
addr := fmt.Sprintf(":%s", args.NotifierPort)
|
addr := fmt.Sprintf(":%d", args.NotifierPort)
|
||||||
logrus.Info(addr)
|
logrus.Info(addr)
|
||||||
conn, err := tcpConnReady(addr)
|
conn, err := tcpConnReady(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -77,7 +96,10 @@ func TestNotifierServer(t *testing.T) {
|
||||||
|
|
||||||
// Hacky but needed because if the reader isn't ready
|
// Hacky but needed because if the reader isn't ready
|
||||||
// before the writer sends it won't get the data
|
// before the writer sends it won't get the data
|
||||||
time.Sleep(time.Second)
|
err = subReady(hub)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
hash, _ := hex.DecodeString("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
|
hash, _ := hex.DecodeString("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
|
||||||
logrus.Warn("sending hash")
|
logrus.Warn("sending hash")
|
||||||
|
|
|
@ -7,8 +7,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"hash"
|
"hash"
|
||||||
"io/ioutil"
|
golog "log"
|
||||||
"log"
|
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
@ -27,7 +26,7 @@ import (
|
||||||
"github.com/olivere/elastic/v7"
|
"github.com/olivere/elastic/v7"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
logrus "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/reflection"
|
"google.golang.org/grpc/reflection"
|
||||||
)
|
)
|
||||||
|
@ -56,6 +55,7 @@ type Server struct {
|
||||||
HeightSubsMut sync.RWMutex
|
HeightSubsMut sync.RWMutex
|
||||||
NotifierChan chan interface{}
|
NotifierChan chan interface{}
|
||||||
Grp *stop.Group
|
Grp *stop.Group
|
||||||
|
notiferListener *net.TCPListener
|
||||||
sessionManager *sessionManager
|
sessionManager *sessionManager
|
||||||
pb.UnimplementedHubServer
|
pb.UnimplementedHubServer
|
||||||
}
|
}
|
||||||
|
@ -153,29 +153,50 @@ func (s *Server) Run() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) Stop() {
|
||||||
|
log.Println("Shutting down server...")
|
||||||
|
|
||||||
|
if s.EsClient != nil {
|
||||||
|
log.Println("Stopping es client...")
|
||||||
|
s.EsClient.Stop()
|
||||||
|
}
|
||||||
|
if s.GrpcServer != nil {
|
||||||
|
log.Println("Stopping grpc server...")
|
||||||
|
s.GrpcServer.GracefulStop()
|
||||||
|
}
|
||||||
|
log.Println("Stopping other server threads...")
|
||||||
|
s.Grp.StopAndWait()
|
||||||
|
if s.DB != nil {
|
||||||
|
log.Println("Stopping database connection...")
|
||||||
|
s.DB.Shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Println("Returning from Stop...")
|
||||||
|
}
|
||||||
|
|
||||||
func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, error) {
|
func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, error) {
|
||||||
tmpName, err := ioutil.TempDir("", "go-lbry-hub")
|
tmpName, err := os.MkdirTemp("", "go-lbry-hub")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Info(err)
|
log.Info(err)
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
logrus.Info("tmpName", tmpName)
|
log.Info("tmpName", tmpName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Info(err)
|
log.Info(err)
|
||||||
}
|
}
|
||||||
myDB, err := db.GetProdDB(args.DBPath, tmpName, grp)
|
myDB, err := db.GetProdDB(args.DBPath, tmpName, grp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Can't load the db, fail loudly
|
// Can't load the db, fail loudly
|
||||||
logrus.Info(err)
|
log.Info(err)
|
||||||
log.Fatalln(err)
|
log.Fatalln(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if myDB.LastState != nil {
|
if myDB.LastState != nil {
|
||||||
logrus.Infof("DB version: %v", myDB.LastState.DBVersion)
|
log.Infof("DB version: %v", myDB.LastState.DBVersion)
|
||||||
logrus.Infof("height: %v", myDB.LastState.Height)
|
log.Infof("height: %v", myDB.LastState.Height)
|
||||||
logrus.Infof("genesis: %v", myDB.LastState.Genesis.String())
|
log.Infof("genesis: %v", myDB.LastState.Genesis.String())
|
||||||
logrus.Infof("tip: %v", myDB.LastState.Tip.String())
|
log.Infof("tip: %v", myDB.LastState.Tip.String())
|
||||||
logrus.Infof("tx count: %v", myDB.LastState.TxCount)
|
log.Infof("tx count: %v", myDB.LastState.TxCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
blockingChannelHashes := make([][]byte, 0, 10)
|
blockingChannelHashes := make([][]byte, 0, 10)
|
||||||
|
@ -186,7 +207,7 @@ func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, erro
|
||||||
for _, id := range args.BlockingChannelIds {
|
for _, id := range args.BlockingChannelIds {
|
||||||
hash, err := hex.DecodeString(id)
|
hash, err := hex.DecodeString(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Warn("Invalid channel id: ", id)
|
log.Warn("Invalid channel id: ", id)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
blockingChannelHashes = append(blockingChannelHashes, hash)
|
blockingChannelHashes = append(blockingChannelHashes, hash)
|
||||||
|
@ -196,7 +217,7 @@ func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, erro
|
||||||
for _, id := range args.FilteringChannelIds {
|
for _, id := range args.FilteringChannelIds {
|
||||||
hash, err := hex.DecodeString(id)
|
hash, err := hex.DecodeString(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Warn("Invalid channel id: ", id)
|
log.Warn("Invalid channel id: ", id)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
filteringChannelHashes = append(filteringChannelHashes, hash)
|
filteringChannelHashes = append(filteringChannelHashes, hash)
|
||||||
|
@ -207,10 +228,10 @@ func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, erro
|
||||||
myDB.FilteringChannelHashes = filteringChannelHashes
|
myDB.FilteringChannelHashes = filteringChannelHashes
|
||||||
|
|
||||||
if len(filteringIds) > 0 {
|
if len(filteringIds) > 0 {
|
||||||
logrus.Infof("filtering claims reposted by channels: %+s", filteringIds)
|
log.Infof("filtering claims reposted by channels: %+s", filteringIds)
|
||||||
}
|
}
|
||||||
if len(blockingIds) > 0 {
|
if len(blockingIds) > 0 {
|
||||||
logrus.Infof("blocking claims reposted by channels: %+s", blockingIds)
|
log.Infof("blocking claims reposted by channels: %+s", blockingIds)
|
||||||
}
|
}
|
||||||
|
|
||||||
return myDB, nil
|
return myDB, nil
|
||||||
|
@ -234,7 +255,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
|
||||||
|
|
||||||
var client *elastic.Client = nil
|
var client *elastic.Client = nil
|
||||||
if !args.DisableEs {
|
if !args.DisableEs {
|
||||||
esUrl := args.EsHost + ":" + args.EsPort
|
esUrl := args.EsHost + ":" + fmt.Sprintf("%d", args.EsPort)
|
||||||
opts := []elastic.ClientOptionFunc{
|
opts := []elastic.ClientOptionFunc{
|
||||||
elastic.SetSniff(true),
|
elastic.SetSniff(true),
|
||||||
elastic.SetSnifferTimeoutStartup(time.Second * 60),
|
elastic.SetSnifferTimeoutStartup(time.Second * 60),
|
||||||
|
@ -242,7 +263,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
|
||||||
elastic.SetURL(esUrl),
|
elastic.SetURL(esUrl),
|
||||||
}
|
}
|
||||||
if args.Debug {
|
if args.Debug {
|
||||||
opts = append(opts, elastic.SetTraceLog(log.New(os.Stderr, "[[ELASTIC]]", 0)))
|
opts = append(opts, elastic.SetTraceLog(golog.New(os.Stderr, "[[ELASTIC]]", 0)))
|
||||||
}
|
}
|
||||||
client, err = elastic.NewClient(opts...)
|
client, err = elastic.NewClient(opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -271,7 +292,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
|
||||||
if !args.DisableResolve {
|
if !args.DisableResolve {
|
||||||
myDB, err = LoadDatabase(args, grp)
|
myDB, err = LoadDatabase(args, grp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Warning(err)
|
log.Warning(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -302,7 +323,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
|
||||||
chain := chaincfg.MainNetParams
|
chain := chaincfg.MainNetParams
|
||||||
if dbChain != nil && cliChain != nil {
|
if dbChain != nil && cliChain != nil {
|
||||||
if dbChain != cliChain {
|
if dbChain != cliChain {
|
||||||
logrus.Warnf("network: %v (from db) conflicts with %v (from cli)", dbChain.Name, cliChain.Name)
|
log.Warnf("network: %v (from db) conflicts with %v (from cli)", dbChain.Name, cliChain.Name)
|
||||||
}
|
}
|
||||||
chain = *dbChain
|
chain = *dbChain
|
||||||
} else if dbChain != nil {
|
} else if dbChain != nil {
|
||||||
|
@ -310,7 +331,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
|
||||||
} else if cliChain != nil {
|
} else if cliChain != nil {
|
||||||
chain = *cliChain
|
chain = *cliChain
|
||||||
}
|
}
|
||||||
logrus.Infof("network: %v", chain.Name)
|
log.Infof("network: %v", chain.Name)
|
||||||
|
|
||||||
args.GenesisHash = chain.GenesisHash.String()
|
args.GenesisHash = chain.GenesisHash.String()
|
||||||
|
|
||||||
|
@ -338,34 +359,36 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
|
||||||
ExternalIP: net.IPv4(127, 0, 0, 1),
|
ExternalIP: net.IPv4(127, 0, 0, 1),
|
||||||
HeightSubs: make(map[net.Addr]net.Conn),
|
HeightSubs: make(map[net.Addr]net.Conn),
|
||||||
HeightSubsMut: sync.RWMutex{},
|
HeightSubsMut: sync.RWMutex{},
|
||||||
NotifierChan: make(chan interface{}),
|
NotifierChan: make(chan interface{}, 1),
|
||||||
Grp: grp,
|
Grp: grp,
|
||||||
sessionManager: newSessionManager(myDB, args, sessionGrp, &chain),
|
sessionManager: nil,
|
||||||
}
|
}
|
||||||
|
// FIXME: HACK
|
||||||
|
s.sessionManager = newSessionManager(s, myDB, args, sessionGrp, &chain)
|
||||||
|
|
||||||
// Start up our background services
|
// Start up our background services
|
||||||
if !args.DisableResolve && !args.DisableRocksDBRefresh {
|
if !args.DisableResolve && !args.DisableRocksDBRefresh {
|
||||||
logrus.Info("Running detect changes")
|
log.Info("Running detect changes")
|
||||||
myDB.RunDetectChanges(s.NotifierChan)
|
myDB.RunDetectChanges(s.NotifierChan)
|
||||||
}
|
}
|
||||||
if !args.DisableBlockingAndFiltering {
|
if !args.DisableBlockingAndFiltering {
|
||||||
myDB.RunGetBlocksAndFilters()
|
myDB.RunGetBlocksAndFilters()
|
||||||
}
|
}
|
||||||
if !args.DisableStartPrometheus {
|
if !args.DisableStartPrometheus {
|
||||||
go s.prometheusEndpoint(s.Args.PrometheusPort, "metrics")
|
go s.prometheusEndpoint(fmt.Sprintf("%d", s.Args.PrometheusPort), "metrics")
|
||||||
}
|
}
|
||||||
if !args.DisableStartUDP {
|
if !args.DisableStartUDP {
|
||||||
go func() {
|
go func() {
|
||||||
err := s.UDPServer(s.Args.Port)
|
err := s.UDPServer(s.Args.Port)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Errorf("UDP Server (%d) failed! %v", s.Args.Port, err)
|
log.Errorf("UDP Server (%d) failed! %v", s.Args.Port, err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
if s.Args.JSONRPCPort != 0 {
|
if s.Args.JSONRPCPort != 0 {
|
||||||
go func() {
|
go func() {
|
||||||
err := s.UDPServer(s.Args.JSONRPCPort)
|
err := s.UDPServer(s.Args.JSONRPCPort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Errorf("UDP Server (%d) failed! %v", s.Args.JSONRPCPort, err)
|
log.Errorf("UDP Server (%d) failed! %v", s.Args.JSONRPCPort, err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -409,7 +432,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
|
||||||
// for this hub to allow for metric tracking.
|
// for this hub to allow for metric tracking.
|
||||||
func (s *Server) prometheusEndpoint(port string, endpoint string) {
|
func (s *Server) prometheusEndpoint(port string, endpoint string) {
|
||||||
http.Handle("/"+endpoint, promhttp.Handler())
|
http.Handle("/"+endpoint, promhttp.Handler())
|
||||||
log.Println(fmt.Sprintf("listening on :%s /%s", port, endpoint))
|
log.Printf("listening on :%s /%s\n", port, endpoint)
|
||||||
err := http.ListenAndServe(":"+port, nil)
|
err := http.ListenAndServe(":"+port, nil)
|
||||||
log.Fatalln("Shouldn't happen??!?!", err)
|
log.Fatalln("Shouldn't happen??!?!", err)
|
||||||
}
|
}
|
||||||
|
@ -567,7 +590,7 @@ func InternalResolve(urls []string, DB *db.ReadOnlyDBColumnFamily) (*pb.Outputs,
|
||||||
BlockedTotal: 0, //TODO
|
BlockedTotal: 0, //TODO
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Warn(res)
|
log.Warn(res)
|
||||||
|
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,6 @@ func (s *Server) GetNumPeersExported() func() int64 {
|
||||||
return s.getNumPeers
|
return s.getNumPeers
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSessionManagerExported(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
|
func NewSessionManagerExported(server *Server, db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
|
||||||
return newSessionManager(db, args, grp, chain)
|
return newSessionManager(server, db, args, grp, chain)
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,11 @@ type hashXNotification struct {
|
||||||
statusStr string
|
statusStr string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type peerNotification struct {
|
||||||
|
address string
|
||||||
|
port string
|
||||||
|
}
|
||||||
|
|
||||||
type session struct {
|
type session struct {
|
||||||
id uintptr
|
id uintptr
|
||||||
addr net.Addr
|
addr net.Addr
|
||||||
|
@ -41,6 +46,8 @@ type session struct {
|
||||||
hashXSubs map[[HASHX_LEN]byte]string
|
hashXSubs map[[HASHX_LEN]byte]string
|
||||||
// headersSub indicates header subscription
|
// headersSub indicates header subscription
|
||||||
headersSub bool
|
headersSub bool
|
||||||
|
// peersSub indicates peer subscription
|
||||||
|
peersSub bool
|
||||||
// headersSubRaw indicates the header subscription mode
|
// headersSubRaw indicates the header subscription mode
|
||||||
headersSubRaw bool
|
headersSubRaw bool
|
||||||
// client provides the ability to send notifications
|
// client provides the ability to send notifications
|
||||||
|
@ -55,12 +62,11 @@ type session struct {
|
||||||
func (s *session) doNotify(notification interface{}) {
|
func (s *session) doNotify(notification interface{}) {
|
||||||
var method string
|
var method string
|
||||||
var params interface{}
|
var params interface{}
|
||||||
switch notification.(type) {
|
switch note := notification.(type) {
|
||||||
case headerNotification:
|
case headerNotification:
|
||||||
Nice! Nice!
|
|||||||
if !s.headersSub {
|
if !s.headersSub {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
note, _ := notification.(headerNotification)
|
|
||||||
heightHash := note.HeightHash
|
heightHash := note.HeightHash
|
||||||
method = "blockchain.headers.subscribe"
|
method = "blockchain.headers.subscribe"
|
||||||
if s.headersSubRaw {
|
if s.headersSubRaw {
|
||||||
|
@ -80,7 +86,6 @@ func (s *session) doNotify(notification interface{}) {
|
||||||
params = header
|
params = header
|
||||||
}
|
}
|
||||||
case hashXNotification:
|
case hashXNotification:
|
||||||
note, _ := notification.(hashXNotification)
|
|
||||||
orig, ok := s.hashXSubs[note.hashX]
|
orig, ok := s.hashXSubs[note.hashX]
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
|
@ -95,6 +100,13 @@ func (s *session) doNotify(notification interface{}) {
|
||||||
status = hex.EncodeToString(note.status)
|
status = hex.EncodeToString(note.status)
|
||||||
}
|
}
|
||||||
```
if !s.peersBool {
return
}
```
|
|||||||
params = []string{orig, status}
|
params = []string{orig, status}
|
||||||
|
case peerNotification:
|
||||||
|
if !s.peersSub {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
method = "server.peers.subscribe"
|
||||||
|
params = []string{note.address, note.port}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
log.Warnf("unknown notification type: %v", notification)
|
log.Warnf("unknown notification type: %v", notification)
|
||||||
return
|
return
|
||||||
|
@ -126,14 +138,17 @@ type sessionManager struct {
|
||||||
manageTicker *time.Ticker
|
manageTicker *time.Ticker
|
||||||
db *db.ReadOnlyDBColumnFamily
|
db *db.ReadOnlyDBColumnFamily
|
||||||
args *Args
|
args *Args
|
||||||
|
server *Server
|
||||||
chain *chaincfg.Params
|
chain *chaincfg.Params
|
||||||
|
// peerSubs are sessions subscribed via 'blockchain.peers.subscribe'
|
||||||
|
peerSubs sessionMap
|
||||||
// headerSubs are sessions subscribed via 'blockchain.headers.subscribe'
|
// headerSubs are sessions subscribed via 'blockchain.headers.subscribe'
|
||||||
headerSubs sessionMap
|
headerSubs sessionMap
|
||||||
// hashXSubs are sessions subscribed via 'blockchain.{address,scripthash}.subscribe'
|
// hashXSubs are sessions subscribed via 'blockchain.{address,scripthash}.subscribe'
|
||||||
hashXSubs map[[HASHX_LEN]byte]sessionMap
|
hashXSubs map[[HASHX_LEN]byte]sessionMap
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
|
func newSessionManager(server *Server, db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
|
||||||
return &sessionManager{
|
return &sessionManager{
|
||||||
sessions: make(sessionMap),
|
sessions: make(sessionMap),
|
||||||
grp: grp,
|
grp: grp,
|
||||||
|
@ -142,7 +157,9 @@ func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Grou
|
||||||
manageTicker: time.NewTicker(time.Duration(max(5, args.SessionTimeout/20)) * time.Second),
|
manageTicker: time.NewTicker(time.Duration(max(5, args.SessionTimeout/20)) * time.Second),
|
||||||
db: db,
|
db: db,
|
||||||
args: args,
|
args: args,
|
||||||
|
server: server,
|
||||||
chain: chain,
|
chain: chain,
|
||||||
|
peerSubs: make(sessionMap),
|
||||||
headerSubs: make(sessionMap),
|
headerSubs: make(sessionMap),
|
||||||
hashXSubs: make(map[[HASHX_LEN]byte]sessionMap),
|
hashXSubs: make(map[[HASHX_LEN]byte]sessionMap),
|
||||||
}
|
}
|
||||||
|
@ -211,8 +228,15 @@ func (sm *sessionManager) addSession(conn net.Conn) *session {
|
||||||
log.Errorf("RegisterName: %v\n", err)
|
log.Errorf("RegisterName: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Register "server.peers" handlers.
|
||||||
|
peersSvc := &PeersService{Server: sm.server}
|
||||||
|
err = s1.RegisterName("server.peers", peersSvc)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("RegisterName: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Register "blockchain.claimtrie.*"" handlers.
|
// Register "blockchain.claimtrie.*"" handlers.
|
||||||
claimtrieSvc := &ClaimtrieService{sm.db}
|
claimtrieSvc := &ClaimtrieService{sm.db, sm.server}
|
||||||
err = s1.RegisterName("blockchain.claimtrie", claimtrieSvc)
|
err = s1.RegisterName("blockchain.claimtrie", claimtrieSvc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("RegisterName: %v\n", err)
|
log.Errorf("RegisterName: %v\n", err)
|
||||||
|
@ -286,6 +310,18 @@ func (sm *sessionManager) broadcastTx(rawTx []byte) (*chainhash.Hash, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sm *sessionManager) peersSubscribe(sess *session, subscribe bool) {
|
||||||
|
sm.sessionsMut.Lock()
|
||||||
|
defer sm.sessionsMut.Unlock()
|
||||||
|
if subscribe {
|
||||||
|
sm.peerSubs[sess.id] = sess
|
||||||
|
sess.peersSub = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
delete(sm.peerSubs, sess.id)
|
||||||
|
sess.peersSub = false
|
||||||
|
}
|
||||||
|
|
||||||
func (sm *sessionManager) headersSubscribe(sess *session, raw bool, subscribe bool) {
|
func (sm *sessionManager) headersSubscribe(sess *session, raw bool, subscribe bool) {
|
||||||
sm.sessionsMut.Lock()
|
sm.sessionsMut.Lock()
|
||||||
defer sm.sessionsMut.Unlock()
|
defer sm.sessionsMut.Unlock()
|
||||||
|
@ -325,16 +361,15 @@ func (sm *sessionManager) hashXSubscribe(sess *session, hashX []byte, original s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *sessionManager) doNotify(notification interface{}) {
|
func (sm *sessionManager) doNotify(notification interface{}) {
|
||||||
switch notification.(type) {
|
switch note := notification.(type) {
|
||||||
case internal.HeightHash:
|
case internal.HeightHash:
|
||||||
// The HeightHash notification translates to headerNotification.
|
// The HeightHash notification translates to headerNotification.
|
||||||
notification = &headerNotification{HeightHash: notification.(internal.HeightHash)}
|
notification = &headerNotification{HeightHash: note}
|
||||||
}
|
}
|
||||||
sm.sessionsMut.RLock()
|
sm.sessionsMut.RLock()
|
||||||
var subsCopy sessionMap
|
var subsCopy sessionMap
|
||||||
switch notification.(type) {
|
switch note := notification.(type) {
|
||||||
case headerNotification:
|
case headerNotification:
|
||||||
note, _ := notification.(headerNotification)
|
|
||||||
subsCopy = sm.headerSubs
|
subsCopy = sm.headerSubs
|
||||||
if len(subsCopy) > 0 {
|
if len(subsCopy) > 0 {
|
||||||
hdr := [HEADER_SIZE]byte{}
|
hdr := [HEADER_SIZE]byte{}
|
||||||
|
@ -343,7 +378,6 @@ func (sm *sessionManager) doNotify(notification interface{}) {
|
||||||
note.blockHeaderStr = hex.EncodeToString(note.BlockHeader[:])
|
note.blockHeaderStr = hex.EncodeToString(note.BlockHeader[:])
|
||||||
}
|
}
|
||||||
case hashXNotification:
|
case hashXNotification:
|
||||||
note, _ := notification.(hashXNotification)
|
|
||||||
hashXSubs, ok := sm.hashXSubs[note.hashX]
|
hashXSubs, ok := sm.hashXSubs[note.hashX]
|
||||||
if ok {
|
if ok {
|
||||||
subsCopy = hashXSubs
|
subsCopy = hashXSubs
|
||||||
|
@ -351,6 +385,8 @@ func (sm *sessionManager) doNotify(notification interface{}) {
|
||||||
if len(subsCopy) > 0 {
|
if len(subsCopy) > 0 {
|
||||||
note.statusStr = hex.EncodeToString(note.status)
|
note.statusStr = hex.EncodeToString(note.status)
|
||||||
}
|
}
|
||||||
|
case peerNotification:
|
||||||
|
subsCopy = sm.peerSubs
|
||||||
default:
|
default:
|
||||||
log.Warnf("unknown notification type: %v", notification)
|
log.Warnf("unknown notification type: %v", notification)
|
||||||
}
|
}
|
||||||
|
@ -369,8 +405,10 @@ type sessionServerCodec struct {
|
||||||
|
|
||||||
// ReadRequestHeader provides ability to rewrite the incoming
|
// ReadRequestHeader provides ability to rewrite the incoming
|
||||||
// request "method" field. For example:
|
// request "method" field. For example:
|
||||||
|
//
|
||||||
// blockchain.block.get_header -> blockchain.block.Get_header
|
// blockchain.block.get_header -> blockchain.block.Get_header
|
||||||
// blockchain.address.listunspent -> blockchain.address.Listunspent
|
// blockchain.address.listunspent -> blockchain.address.Listunspent
|
||||||
|
//
|
||||||
// This makes the "method" string compatible with rpc.Server
|
// This makes the "method" string compatible with rpc.Server
|
||||||
// requirements.
|
// requirements.
|
||||||
func (c *sessionServerCodec) ReadRequestHeader(req *rpc.Request) error {
|
func (c *sessionServerCodec) ReadRequestHeader(req *rpc.Request) error {
|
||||||
|
|
|
@ -49,6 +49,9 @@ func interruptListener() <-chan struct{} {
|
||||||
case sig := <-interruptChannel:
|
case sig := <-interruptChannel:
|
||||||
log.Infof("Received signal (%s). Already "+
|
log.Infof("Received signal (%s). Already "+
|
||||||
"shutting down...", sig)
|
"shutting down...", sig)
|
||||||
|
case <-shutdownRequestChannel:
|
||||||
|
log.Info("Shutdown requested. Already " +
|
||||||
|
"shutting down...")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
|
@ -10,12 +10,9 @@ package main
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v3/extras/stop"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// initsignals sets the signals to be caught by the signal handler
|
// initsignals sets the signals to be caught by the signal handler
|
||||||
func initsignals(stopCh stop.Chan) {
|
func initsignals() {
|
||||||
shutdownRequestChannel = stopCh
|
|
||||||
interruptSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
|
interruptSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
|
||||||
}
|
}
|
||||||
|
|
OK, I see why you pass the
server.Server
into the session manager, and also store it inClaimtrieService
.The python hub has a
SearchIndex
class which contains the search functionality of theHubServerService
:75d64f9dc6/hub/herald/search.py (L29)
The
SearchIndex
is passed off to the session manager instead of the wholeHubServerService
:https://github.com/lbryio/hub/search?q=SearchIndex
Low priority to do this now, but splitting the search functionality (elastic search client, caches for search, etc) from
server.Server
into aSearchIndex
struct/obj would be nice.