blockchain.transaction.broadcast implementation (#80)
* Generate secondary hashXNotification(s) on every headerNotification. * Attempt LBCD connection with rpc.Client. * Optional --daemon-url. * Correct HashXStatusKey field. Should be HASHX_LEN. * Connect to lbcd using lbcd/rpcclient library. * Handle deprecation of node.js 12 actions. * Add --daemon-ca-path argument and establish HTTPS connection if specified. * Remove dead code. Tighten definition of TransactionBroadcastReq. * Correct default daemon URL.
This commit is contained in:
parent
711c4b4b7b
commit
d2193e980a
13 changed files with 173 additions and 61 deletions
2
.github/workflows/build-short.yml
vendored
2
.github/workflows/build-short.yml
vendored
|
@ -7,6 +7,6 @@ jobs:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- name: Checkout
|
- name: Checkout
|
||||||
uses: actions/checkout@v2
|
uses: actions/checkout@v3
|
||||||
- name: Build and Test
|
- name: Build and Test
|
||||||
uses: ./
|
uses: ./
|
||||||
|
|
26
db/db_get.go
26
db/db_get.go
|
@ -93,6 +93,32 @@ func (db *ReadOnlyDBColumnFamily) GetBlockTXs(height uint32) ([]*chainhash.Hash,
|
||||||
return value.TxHashes, nil
|
return value.TxHashes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *ReadOnlyDBColumnFamily) GetTouchedHashXs(height uint32) ([][]byte, error) {
|
||||||
|
handle, err := db.EnsureHandle(prefixes.TouchedHashX)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
key := prefixes.TouchedHashXKey{
|
||||||
|
Prefix: []byte{prefixes.TouchedHashX},
|
||||||
|
Height: height,
|
||||||
|
}
|
||||||
|
slice, err := db.DB.GetCF(db.Opts, handle, key.PackKey())
|
||||||
|
defer slice.Free()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if slice.Size() == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
rawValue := make([]byte, len(slice.Data()))
|
||||||
|
copy(rawValue, slice.Data())
|
||||||
|
value := prefixes.TouchedHashXValue{}
|
||||||
|
value.UnpackValue(rawValue)
|
||||||
|
return value.TouchedHashXs, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (db *ReadOnlyDBColumnFamily) GetHeader(height uint32) ([]byte, error) {
|
func (db *ReadOnlyDBColumnFamily) GetHeader(height uint32) ([]byte, error) {
|
||||||
handle, err := db.EnsureHandle(prefixes.Header)
|
handle, err := db.EnsureHandle(prefixes.Header)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -3413,7 +3413,7 @@ func (kv *TouchedHashXValue) UnpackValue(buf []byte) {
|
||||||
|
|
||||||
type HashXStatusKey struct {
|
type HashXStatusKey struct {
|
||||||
Prefix []byte `struct:"[1]byte" json:"prefix"`
|
Prefix []byte `struct:"[1]byte" json:"prefix"`
|
||||||
HashX []byte `struct:"[20]byte" json:"hashX"`
|
HashX []byte `struct:"[11]byte" json:"hashX"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type HashXStatusValue struct {
|
type HashXStatusValue struct {
|
||||||
|
@ -3425,15 +3425,15 @@ func (kv *HashXStatusKey) NumFields() int {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kv *HashXStatusKey) PartialPack(fields int) []byte {
|
func (kv *HashXStatusKey) PartialPack(fields int) []byte {
|
||||||
// b'>20s'
|
// b'>20s' (really HASHX_LEN 11 bytes)
|
||||||
n := len(kv.Prefix) + 20
|
n := len(kv.Prefix) + 11
|
||||||
buf := make([]byte, n)
|
buf := make([]byte, n)
|
||||||
offset := 0
|
offset := 0
|
||||||
offset += copy(buf[offset:], kv.Prefix[:1])
|
offset += copy(buf[offset:], kv.Prefix[:1])
|
||||||
if fields <= 0 {
|
if fields <= 0 {
|
||||||
return buf[:offset]
|
return buf[:offset]
|
||||||
}
|
}
|
||||||
offset += copy(buf[offset:], kv.HashX[:20])
|
offset += copy(buf[offset:], kv.HashX[:11])
|
||||||
return buf[:offset]
|
return buf[:offset]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3442,12 +3442,12 @@ func (kv *HashXStatusKey) PackKey() []byte {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kv *HashXStatusKey) UnpackKey(buf []byte) {
|
func (kv *HashXStatusKey) UnpackKey(buf []byte) {
|
||||||
// b'>20s'
|
// b'>20s' (really HASHX_LEN 11 bytes)
|
||||||
offset := 0
|
offset := 0
|
||||||
kv.Prefix = buf[offset : offset+1]
|
kv.Prefix = buf[offset : offset+1]
|
||||||
offset += 1
|
offset += 1
|
||||||
kv.HashX = buf[offset : offset+20]
|
kv.HashX = buf[offset : offset+11]
|
||||||
offset += 20
|
offset += 11
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kv *HashXStatusValue) PackValue() []byte {
|
func (kv *HashXStatusValue) PackValue() []byte {
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -27,6 +27,8 @@ require (
|
||||||
require (
|
require (
|
||||||
github.com/beorn7/perks v1.0.1 // indirect
|
github.com/beorn7/perks v1.0.1 // indirect
|
||||||
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
|
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
|
||||||
|
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd // indirect
|
||||||
|
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 // indirect
|
||||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||||
github.com/golang/protobuf v1.5.2 // indirect
|
github.com/golang/protobuf v1.5.2 // indirect
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -68,11 +68,13 @@ github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufo
|
||||||
github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
|
github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
|
||||||
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
|
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
|
||||||
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce/go.mod h1:0DVlHczLPewLcPGEIeUEzfOJhqGPQ0mJJRDBtD307+o=
|
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce/go.mod h1:0DVlHczLPewLcPGEIeUEzfOJhqGPQ0mJJRDBtD307+o=
|
||||||
|
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd h1:R/opQEbFEy9JGkIguV40SvRY1uliPX8ifOvi6ICsFCw=
|
||||||
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg=
|
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg=
|
||||||
github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY=
|
github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY=
|
||||||
github.com/btcsuite/goleveldb v1.0.0/go.mod h1:QiK9vBlgftBg6rWQIj6wFzbPfRjiykIEhBH4obrXJ/I=
|
github.com/btcsuite/goleveldb v1.0.0/go.mod h1:QiK9vBlgftBg6rWQIj6wFzbPfRjiykIEhBH4obrXJ/I=
|
||||||
github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
|
github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
|
||||||
github.com/btcsuite/snappy-go v1.0.0/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
|
github.com/btcsuite/snappy-go v1.0.0/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
|
||||||
|
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 h1:R8vQdOQdZ9Y3SkEwmHoWBmX1DNXhXZqlTpq6s4tyJGc=
|
||||||
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
|
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
|
||||||
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
|
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
|
||||||
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
|
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
|
||||||
|
|
|
@ -3,6 +3,7 @@ package server
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -25,6 +26,8 @@ type Args struct {
|
||||||
Port int
|
Port int
|
||||||
DBPath string
|
DBPath string
|
||||||
Chain *string
|
Chain *string
|
||||||
|
DaemonURL *url.URL
|
||||||
|
DaemonCAPath string
|
||||||
EsHost string
|
EsHost string
|
||||||
EsPort int
|
EsPort int
|
||||||
PrometheusPort int
|
PrometheusPort int
|
||||||
|
@ -203,10 +206,19 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
||||||
environment := GetEnvironmentStandard()
|
environment := GetEnvironmentStandard()
|
||||||
parser := argparse.NewParser("herald", "herald server and client")
|
parser := argparse.NewParser("herald", "herald server and client")
|
||||||
|
|
||||||
serveCmd := parser.NewCommand("serve", "start the hub server")
|
serveCmd := parser.NewCommand("serve", "start the herald server")
|
||||||
searchCmd := parser.NewCommand("search", "claim search")
|
searchCmd := parser.NewCommand("search", "claim search")
|
||||||
dbCmd := parser.NewCommand("db", "db testing")
|
dbCmd := parser.NewCommand("db", "db testing")
|
||||||
|
|
||||||
|
defaultDaemonURL := "http://localhost:9245"
|
||||||
|
if url, ok := environment["DAEMON_URL"]; ok {
|
||||||
|
defaultDaemonURL = url
|
||||||
|
}
|
||||||
|
|
||||||
|
validateURL := func(arg []string) error {
|
||||||
|
_, err := url.Parse(arg[0])
|
||||||
|
return err
|
||||||
|
}
|
||||||
validatePort := func(arg []string) error {
|
validatePort := func(arg []string) error {
|
||||||
_, err := strconv.ParseUint(arg[0], 10, 16)
|
_, err := strconv.ParseUint(arg[0], 10, 16)
|
||||||
return err
|
return err
|
||||||
|
@ -218,6 +230,8 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
||||||
dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath})
|
dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath})
|
||||||
chain := parser.Selector("", "chain", []string{chaincfg.MainNetParams.Name, chaincfg.TestNet3Params.Name, chaincfg.RegressionNetParams.Name, "testnet"},
|
chain := parser.Selector("", "chain", []string{chaincfg.MainNetParams.Name, chaincfg.TestNet3Params.Name, chaincfg.RegressionNetParams.Name, "testnet"},
|
||||||
&argparse.Options{Required: false, Help: "Which chain to use, default is 'mainnet'. Values 'regtest' and 'testnet' are for testing", Default: chaincfg.MainNetParams.Name})
|
&argparse.Options{Required: false, Help: "Which chain to use, default is 'mainnet'. Values 'regtest' and 'testnet' are for testing", Default: chaincfg.MainNetParams.Name})
|
||||||
|
daemonURLStr := parser.String("", "daemon-url", &argparse.Options{Required: false, Help: "URL for rpc to lbrycrd or lbcd, <rpcuser>:<rpcpassword>@<lbcd rpc ip><lbrcd rpc port>.", Validate: validateURL, Default: defaultDaemonURL})
|
||||||
|
daemonCAPath := parser.String("", "daemon-ca-path", &argparse.Options{Required: false, Help: "Path to the lbcd CA file. Use SSL certificate to verify connection to lbcd."})
|
||||||
esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: DefaultEsHost})
|
esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: DefaultEsHost})
|
||||||
esPort := parser.Int("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort})
|
esPort := parser.Int("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort})
|
||||||
prometheusPort := parser.Int("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort})
|
prometheusPort := parser.Int("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort})
|
||||||
|
@ -277,6 +291,11 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
||||||
*jsonRPCPort = DefaultJSONRPCPort
|
*jsonRPCPort = DefaultJSONRPCPort
|
||||||
}
|
}
|
||||||
|
|
||||||
|
daemonURL, err := url.Parse(*daemonURLStr)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("URL parse failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
banner := loadBanner(bannerFile, HUB_PROTOCOL_VERSION)
|
banner := loadBanner(bannerFile, HUB_PROTOCOL_VERSION)
|
||||||
|
|
||||||
args := &Args{
|
args := &Args{
|
||||||
|
@ -285,6 +304,8 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
||||||
Port: *port,
|
Port: *port,
|
||||||
DBPath: *dbPath,
|
DBPath: *dbPath,
|
||||||
Chain: chain,
|
Chain: chain,
|
||||||
|
DaemonURL: daemonURL,
|
||||||
|
DaemonCAPath: *daemonCAPath,
|
||||||
EsHost: *esHost,
|
EsHost: *esHost,
|
||||||
EsPort: *esPort,
|
EsPort: *esPort,
|
||||||
PrometheusPort: *prometheusPort,
|
PrometheusPort: *prometheusPort,
|
||||||
|
|
|
@ -705,7 +705,7 @@ func (s *BlockchainScripthashService) Unsubscribe(req *ScripthashSubscribeReq, r
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type TransactionBroadcastReq string
|
type TransactionBroadcastReq [1]string
|
||||||
type TransactionBroadcastResp string
|
type TransactionBroadcastResp string
|
||||||
|
|
||||||
// 'blockchain.transaction.broadcast'
|
// 'blockchain.transaction.broadcast'
|
||||||
|
@ -713,7 +713,7 @@ func (s *BlockchainTransactionService) Broadcast(req *TransactionBroadcastReq, r
|
||||||
if s.sessionMgr == nil {
|
if s.sessionMgr == nil {
|
||||||
return errors.New("no session manager, rpc not supported")
|
return errors.New("no session manager, rpc not supported")
|
||||||
}
|
}
|
||||||
strTx := string(*req)
|
strTx := string((*req)[0])
|
||||||
rawTx, err := hex.DecodeString(strTx)
|
rawTx, err := hex.DecodeString(strTx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -207,7 +207,7 @@ func TestHeadersSubscribe(t *testing.T) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
sm := newSessionManager(nil, db, args, grp, &chaincfg.RegressionNetParams)
|
sm := newSessionManager(nil, db, args, grp, &chaincfg.RegressionNetParams, nil)
|
||||||
sm.start()
|
sm.start()
|
||||||
defer sm.stop()
|
defer sm.stop()
|
||||||
|
|
||||||
|
@ -388,7 +388,7 @@ func TestAddressSubscribe(t *testing.T) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
sm := newSessionManager(nil, db, args, grp, &chaincfg.RegressionNetParams)
|
sm := newSessionManager(nil, db, args, grp, &chaincfg.RegressionNetParams, nil)
|
||||||
sm.start()
|
sm.start()
|
||||||
defer sm.stop()
|
defer sm.stop()
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"hash"
|
"hash"
|
||||||
|
"io/ioutil"
|
||||||
golog "log"
|
golog "log"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -22,6 +23,7 @@ import (
|
||||||
"github.com/lbryio/herald.go/meta"
|
"github.com/lbryio/herald.go/meta"
|
||||||
pb "github.com/lbryio/herald.go/protobuf/go"
|
pb "github.com/lbryio/herald.go/protobuf/go"
|
||||||
"github.com/lbryio/lbcd/chaincfg"
|
"github.com/lbryio/lbcd/chaincfg"
|
||||||
|
lbcd "github.com/lbryio/lbcd/rpcclient"
|
||||||
"github.com/lbryio/lbry.go/v3/extras/stop"
|
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||||
"github.com/olivere/elastic/v7"
|
"github.com/olivere/elastic/v7"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
@ -38,6 +40,7 @@ type Server struct {
|
||||||
WeirdCharsRe *regexp.Regexp
|
WeirdCharsRe *regexp.Regexp
|
||||||
DB *db.ReadOnlyDBColumnFamily
|
DB *db.ReadOnlyDBColumnFamily
|
||||||
Chain *chaincfg.Params
|
Chain *chaincfg.Params
|
||||||
|
DaemonClient *lbcd.Client
|
||||||
EsClient *elastic.Client
|
EsClient *elastic.Client
|
||||||
QueryCache *ttlcache.Cache
|
QueryCache *ttlcache.Cache
|
||||||
S256 *hash.Hash
|
S256 *hash.Hash
|
||||||
|
@ -253,7 +256,32 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var client *elastic.Client = nil
|
var lbcdClient *lbcd.Client = nil
|
||||||
|
if args.DaemonURL != nil && args.DaemonURL.Host != "" {
|
||||||
|
var rpcCertificate []byte
|
||||||
|
if args.DaemonCAPath != "" {
|
||||||
|
rpcCertificate, err = ioutil.ReadFile(args.DaemonCAPath)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("failed to read SSL certificate from path: %v", args.DaemonCAPath)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Warnf("connecting to lbcd daemon at %v...", args.DaemonURL.Host)
|
||||||
|
password, _ := args.DaemonURL.User.Password()
|
||||||
|
cfg := &lbcd.ConnConfig{
|
||||||
|
Host: args.DaemonURL.Host,
|
||||||
|
User: args.DaemonURL.User.Username(),
|
||||||
|
Pass: password,
|
||||||
|
HTTPPostMode: true,
|
||||||
|
DisableTLS: rpcCertificate == nil,
|
||||||
|
Certificates: rpcCertificate,
|
||||||
|
}
|
||||||
|
lbcdClient, err = lbcd.New(cfg, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("lbcd daemon connection failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var esClient *elastic.Client = nil
|
||||||
if !args.DisableEs {
|
if !args.DisableEs {
|
||||||
esUrl := args.EsHost + ":" + fmt.Sprintf("%d", args.EsPort)
|
esUrl := args.EsHost + ":" + fmt.Sprintf("%d", args.EsPort)
|
||||||
opts := []elastic.ClientOptionFunc{
|
opts := []elastic.ClientOptionFunc{
|
||||||
|
@ -265,7 +293,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
|
||||||
if args.Debug {
|
if args.Debug {
|
||||||
opts = append(opts, elastic.SetTraceLog(golog.New(os.Stderr, "[[ELASTIC]]", 0)))
|
opts = append(opts, elastic.SetTraceLog(golog.New(os.Stderr, "[[ELASTIC]]", 0)))
|
||||||
}
|
}
|
||||||
client, err = elastic.NewClient(opts...)
|
esClient, err = elastic.NewClient(opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -344,7 +372,8 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
|
||||||
WeirdCharsRe: weirdCharsRe,
|
WeirdCharsRe: weirdCharsRe,
|
||||||
DB: myDB,
|
DB: myDB,
|
||||||
Chain: &chain,
|
Chain: &chain,
|
||||||
EsClient: client,
|
DaemonClient: lbcdClient,
|
||||||
|
EsClient: esClient,
|
||||||
QueryCache: cache,
|
QueryCache: cache,
|
||||||
S256: &s256,
|
S256: &s256,
|
||||||
LastRefreshCheck: time.Now(),
|
LastRefreshCheck: time.Now(),
|
||||||
|
@ -364,7 +393,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
|
||||||
sessionManager: nil,
|
sessionManager: nil,
|
||||||
}
|
}
|
||||||
// FIXME: HACK
|
// FIXME: HACK
|
||||||
s.sessionManager = newSessionManager(s, myDB, args, sessionGrp, &chain)
|
s.sessionManager = newSessionManager(s, myDB, args, sessionGrp, &chain, lbcdClient)
|
||||||
|
|
||||||
// Start up our background services
|
// Start up our background services
|
||||||
if !args.DisableResolve && !args.DisableRocksDBRefresh {
|
if !args.DisableResolve && !args.DisableRocksDBRefresh {
|
||||||
|
|
|
@ -15,5 +15,5 @@ func (s *Server) GetNumPeersExported() func() int64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSessionManagerExported(server *Server, db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
|
func NewSessionManagerExported(server *Server, db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
|
||||||
return newSessionManager(server, db, args, grp, chain)
|
return newSessionManager(server, db, args, grp, chain, nil)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,8 @@ import (
|
||||||
"github.com/lbryio/herald.go/internal"
|
"github.com/lbryio/herald.go/internal"
|
||||||
"github.com/lbryio/lbcd/chaincfg"
|
"github.com/lbryio/lbcd/chaincfg"
|
||||||
"github.com/lbryio/lbcd/chaincfg/chainhash"
|
"github.com/lbryio/lbcd/chaincfg/chainhash"
|
||||||
|
lbcd "github.com/lbryio/lbcd/rpcclient"
|
||||||
|
"github.com/lbryio/lbcd/wire"
|
||||||
"github.com/lbryio/lbry.go/v3/extras/stop"
|
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
@ -140,6 +142,7 @@ type sessionManager struct {
|
||||||
args *Args
|
args *Args
|
||||||
server *Server
|
server *Server
|
||||||
chain *chaincfg.Params
|
chain *chaincfg.Params
|
||||||
|
lbcd *lbcd.Client
|
||||||
// peerSubs are sessions subscribed via 'blockchain.peers.subscribe'
|
// peerSubs are sessions subscribed via 'blockchain.peers.subscribe'
|
||||||
peerSubs sessionMap
|
peerSubs sessionMap
|
||||||
// headerSubs are sessions subscribed via 'blockchain.headers.subscribe'
|
// headerSubs are sessions subscribed via 'blockchain.headers.subscribe'
|
||||||
|
@ -148,7 +151,7 @@ type sessionManager struct {
|
||||||
hashXSubs map[[HASHX_LEN]byte]sessionMap
|
hashXSubs map[[HASHX_LEN]byte]sessionMap
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSessionManager(server *Server, db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
|
func newSessionManager(server *Server, db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params, lbcd *lbcd.Client) *sessionManager {
|
||||||
return &sessionManager{
|
return &sessionManager{
|
||||||
sessions: make(sessionMap),
|
sessions: make(sessionMap),
|
||||||
grp: grp,
|
grp: grp,
|
||||||
|
@ -159,6 +162,7 @@ func newSessionManager(server *Server, db *db.ReadOnlyDBColumnFamily, args *Args
|
||||||
args: args,
|
args: args,
|
||||||
server: server,
|
server: server,
|
||||||
chain: chain,
|
chain: chain,
|
||||||
|
lbcd: lbcd,
|
||||||
peerSubs: make(sessionMap),
|
peerSubs: make(sessionMap),
|
||||||
headerSubs: make(sessionMap),
|
headerSubs: make(sessionMap),
|
||||||
hashXSubs: make(map[[HASHX_LEN]byte]sessionMap),
|
hashXSubs: make(map[[HASHX_LEN]byte]sessionMap),
|
||||||
|
@ -306,8 +310,12 @@ func (sm *sessionManager) removeSessionLocked(sess *session) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *sessionManager) broadcastTx(rawTx []byte) (*chainhash.Hash, error) {
|
func (sm *sessionManager) broadcastTx(rawTx []byte) (*chainhash.Hash, error) {
|
||||||
// TODO
|
var msgTx wire.MsgTx
|
||||||
return nil, nil
|
err := msgTx.Deserialize(bytes.NewReader(rawTx))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return sm.lbcd.SendRawTransaction(&msgTx, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *sessionManager) peersSubscribe(sess *session, subscribe bool) {
|
func (sm *sessionManager) peersSubscribe(sess *session, subscribe bool) {
|
||||||
|
@ -366,10 +374,12 @@ func (sm *sessionManager) doNotify(notification interface{}) {
|
||||||
// The HeightHash notification translates to headerNotification.
|
// The HeightHash notification translates to headerNotification.
|
||||||
notification = &headerNotification{HeightHash: note}
|
notification = &headerNotification{HeightHash: note}
|
||||||
}
|
}
|
||||||
|
|
||||||
sm.sessionsMut.RLock()
|
sm.sessionsMut.RLock()
|
||||||
var subsCopy sessionMap
|
var subsCopy sessionMap
|
||||||
switch note := notification.(type) {
|
switch note := notification.(type) {
|
||||||
case headerNotification:
|
case headerNotification:
|
||||||
|
log.Infof("header notification @ %#v", note)
|
||||||
subsCopy = sm.headerSubs
|
subsCopy = sm.headerSubs
|
||||||
if len(subsCopy) > 0 {
|
if len(subsCopy) > 0 {
|
||||||
hdr := [HEADER_SIZE]byte{}
|
hdr := [HEADER_SIZE]byte{}
|
||||||
|
@ -378,6 +388,7 @@ func (sm *sessionManager) doNotify(notification interface{}) {
|
||||||
note.blockHeaderStr = hex.EncodeToString(note.BlockHeader[:])
|
note.blockHeaderStr = hex.EncodeToString(note.BlockHeader[:])
|
||||||
}
|
}
|
||||||
case hashXNotification:
|
case hashXNotification:
|
||||||
|
log.Infof("hashX notification @ %#v", note)
|
||||||
hashXSubs, ok := sm.hashXSubs[note.hashX]
|
hashXSubs, ok := sm.hashXSubs[note.hashX]
|
||||||
if ok {
|
if ok {
|
||||||
subsCopy = hashXSubs
|
subsCopy = hashXSubs
|
||||||
|
@ -396,6 +407,27 @@ func (sm *sessionManager) doNotify(notification interface{}) {
|
||||||
for _, sess := range subsCopy {
|
for _, sess := range subsCopy {
|
||||||
sess.doNotify(notification)
|
sess.doNotify(notification)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Produce secondary hashXNotification(s) corresponding to the headerNotification.
|
||||||
|
switch note := notification.(type) {
|
||||||
|
case headerNotification:
|
||||||
|
touched, err := sm.db.GetTouchedHashXs(uint32(note.Height))
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to get touched hashXs at height %v, error: %v", note.Height, err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
for _, hashX := range touched {
|
||||||
|
hashXstatus, err := sm.db.GetStatus(hashX)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to get status of hashX %v, error: %v", hashX, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
note2 := hashXNotification{}
|
||||||
|
copy(note2.hashX[:], hashX)
|
||||||
|
note2.status = hashXstatus
|
||||||
|
sm.doNotify(note2)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type sessionServerCodec struct {
|
type sessionServerCodec struct {
|
||||||
|
|
40
testdata/f.csv
vendored
40
testdata/f.csv
vendored
|
@ -1,21 +1,21 @@
|
||||||
f,
|
f,
|
||||||
660d649ba1defa4ab5ab71f8a977d7f7cedb11056e,919be5811844077f4660af66afa9a59a5ad17cf5c541524e780fe2137bfa250c
|
660d649ba1defa4ab5ab71f8,919be5811844077f4660af66afa9a59a5ad17cf5c541524e780fe2137bfa250c
|
||||||
6623c6895027f70a5330bbcb1153d635abcb4d5224,8dadcde1a6f676d4004eacd399f825006ddf136d1e92b1c92113377b3e1741b4
|
6623c6895027f70a5330bbcb,8dadcde1a6f676d4004eacd399f825006ddf136d1e92b1c92113377b3e1741b4
|
||||||
664f095b24484ebce8f31fbf008e63cc4aa163d401,c0c4a751f569c1f9c01531f57ba674b2ad2338d9c08f9e9fc85b0209d15466b2
|
664f095b24484ebce8f31fbf,c0c4a751f569c1f9c01531f57ba674b2ad2338d9c08f9e9fc85b0209d15466b2
|
||||||
665201a38de7d7243df717c9f9279cdd30105f0f77,d9293577cc0d51fe3a5bee78fea9b2b2222e6c2aa0d26a4ef4bfb7dd095587e8
|
665201a38de7d7243df717c9,d9293577cc0d51fe3a5bee78fea9b2b2222e6c2aa0d26a4ef4bfb7dd095587e8
|
||||||
665328b2449e537b0ca4733f87ac5ebcdf033c5ebd,624f80a361e47c7eb1b815e8714a40f67b4f642a5546547a3fcb5bf5593d8fab
|
665328b2449e537b0ca4733f,624f80a361e47c7eb1b815e8714a40f67b4f642a5546547a3fcb5bf5593d8fab
|
||||||
665ec882021f55b1fbaa5fad00df5c5d07633b7af3,1e917fbc04385290d654f711bdef12773dd54b6b5ea26fe2a9d58ed051f2cb7f
|
665ec882021f55b1fbaa5fad,1e917fbc04385290d654f711bdef12773dd54b6b5ea26fe2a9d58ed051f2cb7f
|
||||||
6671c131cd433750ba6d3908150ca4910841164b74,a2ebfbdf7a23024c340a45f201645aa46f48bc1fdd8d34ed83fcffbf1ee90523
|
6671c131cd433750ba6d3908,a2ebfbdf7a23024c340a45f201645aa46f48bc1fdd8d34ed83fcffbf1ee90523
|
||||||
667fb93d9ae877ba11f337f21422b0679852580802,4710649e06619e13250754937e9c17c20b07434751171aac2f2f78b184aa0146
|
667fb93d9ae877ba11f337f2,4710649e06619e13250754937e9c17c20b07434751171aac2f2f78b184aa0146
|
||||||
668ed5f39a5db059dc3261377f2a47728f7a357d33,8dd8ca749b87f43e290904749a546fe319c9d53e765f065bb8beb234a117655e
|
668ed5f39a5db059dc326137,8dd8ca749b87f43e290904749a546fe319c9d53e765f065bb8beb234a117655e
|
||||||
66951782f6ba94f2b71e46d0cc4a2411b14d81eb70,4f5c9434dd0886c57c2530991cebd973e1b50d5ba8fcfc019e54561217a49bbb
|
66951782f6ba94f2b71e46d0,4f5c9434dd0886c57c2530991cebd973e1b50d5ba8fcfc019e54561217a49bbb
|
||||||
66970565dfe2b01cad49b73a085a3c3f7a3be61c4c,f6ca0ae18c896d9bc97c5a9d0c3a06256485f59c77fb91780b213f933b80f48b
|
66970565dfe2b01cad49b73a,f6ca0ae18c896d9bc97c5a9d0c3a06256485f59c77fb91780b213f933b80f48b
|
||||||
669f6a30a6712062da0cc27181845c04d7430abf73,5c6604bfd63b871daceb7893dd618850458974fe4108871c1a1323fb8ae34e4e
|
669f6a30a6712062da0cc271,5c6604bfd63b871daceb7893dd618850458974fe4108871c1a1323fb8ae34e4e
|
||||||
66a9a7b89b78553592acf3dfc417c1d7654dab3273,0561f28c3a5ea0027ecb3c53fa068772a6b7cb73d23104a14f9aba8cd1f070a2
|
66a9a7b89b78553592acf3df,0561f28c3a5ea0027ecb3c53fa068772a6b7cb73d23104a14f9aba8cd1f070a2
|
||||||
66aba81567ba48f001f843f01354d575c2e2687847,b0f6ae2c1db8263f7e11fc79423109e718d1f3c30bd123c4243401b5e4f1fee6
|
66aba81567ba48f001f843f0,b0f6ae2c1db8263f7e11fc79423109e718d1f3c30bd123c4243401b5e4f1fee6
|
||||||
66b569cc3d28be4466fb28d147f66d6d8769598964,ecee392ad8217f325508ba38d280436fb0a520b79a9627e5e18197bf55540885
|
66b569cc3d28be4466fb28d1,ecee392ad8217f325508ba38d280436fb0a520b79a9627e5e18197bf55540885
|
||||||
66d4662cd100d66055917d6342d48f49d948fcc255,5762a8ac767fa30d2ca76db7081f8a2e4f5da4f0bf92d29e1322da9a154cc3d6
|
66d4662cd100d66055917d63,5762a8ac767fa30d2ca76db7081f8a2e4f5da4f0bf92d29e1322da9a154cc3d6
|
||||||
66d6fa6ac71d0255dd3f185de6480d5b4316b6b050,5fc193e5e51b3bd8e95f4eb9df63236da7abf678fc47c0b339ceb5c127d0f488
|
66d6fa6ac71d0255dd3f185d,5fc193e5e51b3bd8e95f4eb9df63236da7abf678fc47c0b339ceb5c127d0f488
|
||||||
66e5b6c7c231a02a32eedd8383a5750fd135244a03,58c70ffbfada12550f24bf7931cee06eb2e267dec3560e2e46843e383415f163
|
66e5b6c7c231a02a32eedd83,58c70ffbfada12550f24bf7931cee06eb2e267dec3560e2e46843e383415f163
|
||||||
66e673cce02c2163f756491ef05d7535ceb578e215,b8db43d1f6e62361e2e3b8fa765f79c08ddfb3035caa06f8250d6d1b063a7140
|
66e673cce02c2163f756491e,b8db43d1f6e62361e2e3b8fa765f79c08ddfb3035caa06f8250d6d1b063a7140
|
||||||
66fc4ad75184e6029c805d9494eed4e81be770c002,fc7ac5e785f73732d95183d6bdc3423d41a074fc3f04b1304bae1efa652edde1
|
66fc4ad75184e6029c805d94,fc7ac5e785f73732d95183d6bdc3423d41a074fc3f04b1304bae1efa652edde1
|
||||||
|
|
|
40
testdata/g.csv
vendored
40
testdata/g.csv
vendored
|
@ -1,21 +1,21 @@
|
||||||
g,
|
g,
|
||||||
6702c124856d5168381a32971d8933440a1728fc41,575696fd653a4de2f9a8c1f580cf0c229631b0f5d95fceb354cda133e2eb2d34
|
6702c124856d5168381a3297,575696fd653a4de2f9a8c1f580cf0c229631b0f5d95fceb354cda133e2eb2d34
|
||||||
6707f1511e3a2cb28493f91b85e9e4a9d9d07c86a5,ba368e0f859ee36da8701df1c0b52cbf0c0f8a4b1a91f6d0db83a408f5a937d1
|
6707f1511e3a2cb28493f91b,ba368e0f859ee36da8701df1c0b52cbf0c0f8a4b1a91f6d0db83a408f5a937d1
|
||||||
6707fd4213cae8d5342a98ba49b255fa80b2a9a6e4,bd3a44d30f66444f8732119bc7e0cf0bb47f8f0ab2840987fc06b629f3e6d3f4
|
6707fd4213cae8d5342a98ba,bd3a44d30f66444f8732119bc7e0cf0bb47f8f0ab2840987fc06b629f3e6d3f4
|
||||||
6710294a5693224a6222404ba45fd38eb2e77979a4,de35a8ea0a26d17445e2f509db23188961b5cd1229b96d2411565adf63731b5c
|
6710294a5693224a6222404b,de35a8ea0a26d17445e2f509db23188961b5cd1229b96d2411565adf63731b5c
|
||||||
6716a9f84e02143b50d9034aec126b12d7f2708cc4,5823640ae4529f8df2dab20386c887d0a1ba1ffa4583b99dff761c01f670c2fa
|
6716a9f84e02143b50d9034a,5823640ae4529f8df2dab20386c887d0a1ba1ffa4583b99dff761c01f670c2fa
|
||||||
672e51bc65c9b97d482b0b720e6cb673c41fe7b5c5,0687df449bd8cb8d8f526f4189973d084d786ab0927d81c127f56b03c61aa955
|
672e51bc65c9b97d482b0b72,0687df449bd8cb8d8f526f4189973d084d786ab0927d81c127f56b03c61aa955
|
||||||
67682620db65932047689e5eaf392d6b85be801864,b262d40758edb28d1c04fa3a24d8268990516de6846ad94d002ce55640866239
|
67682620db65932047689e5e,b262d40758edb28d1c04fa3a24d8268990516de6846ad94d002ce55640866239
|
||||||
676e8c320dbbf5eebc2969a93fbc51dd7f6062a7d1,c9e2a8e7181a70e2a488b884c8baadb4043a075c6876cb012c67fbec5aa9f615
|
676e8c320dbbf5eebc2969a9,c9e2a8e7181a70e2a488b884c8baadb4043a075c6876cb012c67fbec5aa9f615
|
||||||
6772e2ac48891ee3c2c727835702a374ad0cb70fd6,985a9c9ee7a0626d78dab431e663289762ce6959be314f91f7b08b1466097fd6
|
6772e2ac48891ee3c2c72783,985a9c9ee7a0626d78dab431e663289762ce6959be314f91f7b08b1466097fd6
|
||||||
67847dd1dac117b85d1e20d93580cdf42f00001a77,62e6b1b8c2961703a90276dcde6dad182b2d14e23f27dccc927cca7770b9890e
|
67847dd1dac117b85d1e20d9,62e6b1b8c2961703a90276dcde6dad182b2d14e23f27dccc927cca7770b9890e
|
||||||
678f49948c72b7295f12092a24d300eeff894f1dd7,2e7c456dac5206c5627736924e96ac016a09a88ec5f4835fbe0cf9e294611c88
|
678f49948c72b7295f12092a,2e7c456dac5206c5627736924e96ac016a09a88ec5f4835fbe0cf9e294611c88
|
||||||
67948b9633ab2ec07d7525936254e66f8c957d026c,66b5c54b3a685de3ea18f9e69254eec065eb3207ac1f93494fdcd585e9a267a0
|
67948b9633ab2ec07d752593,66b5c54b3a685de3ea18f9e69254eec065eb3207ac1f93494fdcd585e9a267a0
|
||||||
679674c162db8d3bb57c434fe87825625c4d4daf63,05425880d80258f7441859b3494415a3fd7398c9e209a19674abd48372b283c6
|
679674c162db8d3bb57c434f,05425880d80258f7441859b3494415a3fd7398c9e209a19674abd48372b283c6
|
||||||
67a8d3f17df85502bd644a364721e6364d61635b73,1efce69a3a05c505e9f9cc5c2241d02099c043d934389b430fd8b185e6dfe6cb
|
67a8d3f17df85502bd644a36,1efce69a3a05c505e9f9cc5c2241d02099c043d934389b430fd8b185e6dfe6cb
|
||||||
67bad7f4fb3c6828b6fc4624d43786fc8f55d6eb0f,04a1c0a7ffe7acbf974ca18cf3debbd8e1be3d6703f842f57ef14af6d4c336d3
|
67bad7f4fb3c6828b6fc4624,04a1c0a7ffe7acbf974ca18cf3debbd8e1be3d6703f842f57ef14af6d4c336d3
|
||||||
67c13fb0c65acca5520bc2f59bd91ca3482dbec156,7fdc6989cd778baad45cd98358ea060237b169a4aeaeb14da6ac4686b7858c9f
|
67c13fb0c65acca5520bc2f5,7fdc6989cd778baad45cd98358ea060237b169a4aeaeb14da6ac4686b7858c9f
|
||||||
67d4314588b4424b0ee026536b9bd7857f11cab2ee,c63fd7a85a533b8591577bab805104708ba5458fab0e343d46b3e24a28b92cb5
|
67d4314588b4424b0ee02653,c63fd7a85a533b8591577bab805104708ba5458fab0e343d46b3e24a28b92cb5
|
||||||
67d734244f85f32a58e34e2d9cadf225a56973d32f,d19a6307c24470b3973973319770bdb896218bb58d1f2d07c7226266075057d0
|
67d734244f85f32a58e34e2d,d19a6307c24470b3973973319770bdb896218bb58d1f2d07c7226266075057d0
|
||||||
67d9c159c5d5e407e6b0a4cacf9d6fe62a55b0fedc,89cbdb903fdfe0b44e74b0a69eed3de7029f18c28f77e5509f8ace766ab86610
|
67d9c159c5d5e407e6b0a4ca,89cbdb903fdfe0b44e74b0a69eed3de7029f18c28f77e5509f8ace766ab86610
|
||||||
67fafc73d674250f11e559ab08b287f5714e531761,1752ffbf9807bb2e4e480bf045b4bacc472befe755287384b5a526065a58c065
|
67fafc73d674250f11e559ab,1752ffbf9807bb2e4e480bf045b4bacc472befe755287384b5a526065a58c065
|
||||||
|
|
|
Loading…
Reference in a new issue