Server endpoints goroutine refactor #69

Merged
jeffreypicard merged 18 commits from server-endpoints-goroutine-refactor into master 2022-10-25 07:48:13 +02:00
21 changed files with 546 additions and 307 deletions

View file

@ -8,7 +8,6 @@ import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"os" "os"
"sync"
"time" "time"
"github.com/lbryio/herald.go/db/prefixes" "github.com/lbryio/herald.go/db/prefixes"
@ -16,6 +15,7 @@ import (
"github.com/lbryio/herald.go/internal" "github.com/lbryio/herald.go/internal"
"github.com/lbryio/herald.go/internal/metrics" "github.com/lbryio/herald.go/internal/metrics"
pb "github.com/lbryio/herald.go/protobuf/go" pb "github.com/lbryio/herald.go/protobuf/go"
"github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/linxGnu/grocksdb" "github.com/linxGnu/grocksdb"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -59,11 +59,7 @@ type ReadOnlyDBColumnFamily struct {
BlockedChannels map[string][]byte BlockedChannels map[string][]byte
FilteredStreams map[string][]byte FilteredStreams map[string][]byte
FilteredChannels map[string][]byte FilteredChannels map[string][]byte
moodyjon commented 2022-10-14 16:14:51 +02:00 (Migrated from github.com)
Review

OpenIterators, ItMut not needed anymore.

`OpenIterator`s, `ItMut` not needed anymore.
OpenIterators map[string][]chan struct{} Grp *stop.Group
ItMut sync.RWMutex
ShutdownChan chan struct{}
DoneChan chan struct{}
ShutdownCalled bool
Cleanup func() Cleanup func()
} }
@ -339,19 +335,10 @@ func interruptRequested(interrupted <-chan struct{}) bool {
func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
ch := make(chan *prefixes.PrefixRowKV) ch := make(chan *prefixes.PrefixRowKV)
iterKey := fmt.Sprintf("%p", opts) // Check if we've been told to shutdown in between getting created and getting here
if opts.DB != nil { if opts.Grp != nil && interruptRequested(opts.Grp.Ch()) {
opts.DB.ItMut.Lock() opts.Grp.Done()
moodyjon commented 2022-10-14 16:02:16 +02:00 (Migrated from github.com)
Review

if opts.Grp != nil && interruptRequested(opts.Grp.Ch()) {

This simplification eliminating ShutdownCalled is possible (I think) because you register the iterator with the stop group earlier. Now the registration happens in opts.WithDB()

`if opts.Grp != nil && interruptRequested(opts.Grp.Ch()) {` This simplification eliminating `ShutdownCalled` is possible (I think) because you register the iterator with the stop group earlier. Now the registration happens in `opts.WithDB()`
// There is a tiny chance that we were wating on the above lock while shutdown was return ch
// being called and by the time we get it the db has already notified all active
// iterators to shutdown. In this case we go to the else branch.
if !opts.DB.ShutdownCalled {
opts.DB.OpenIterators[iterKey] = []chan struct{}{opts.DoneChan, opts.ShutdownChan}
opts.DB.ItMut.Unlock()
} else {
opts.DB.ItMut.Unlock()
return ch
}
} }
ro := grocksdb.NewDefaultReadOptions() ro := grocksdb.NewDefaultReadOptions()
@ -369,11 +356,9 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
it.Close() it.Close()
close(ch) close(ch)
ro.Destroy() ro.Destroy()
if opts.DB != nil { if opts.Grp != nil {
opts.DoneChan <- struct{}{} // opts.Grp.DoneNamed(iterKey)
opts.DB.ItMut.Lock() opts.Grp.Done()
delete(opts.DB.OpenIterators, iterKey)
opts.DB.ItMut.Unlock()
} }
}() }()
@ -394,7 +379,7 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
if kv = opts.ReadRow(&prevKey); kv != nil { if kv = opts.ReadRow(&prevKey); kv != nil {
ch <- kv ch <- kv
} }
if interruptRequested(opts.ShutdownChan) { if opts.Grp != nil && interruptRequested(opts.Grp.Ch()) {
return return
} }
} }
@ -546,7 +531,7 @@ func GetWriteDBCF(name string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHandle, er
} }
// GetProdDB returns a db that is used for production. // GetProdDB returns a db that is used for production.
func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func(), error) { func GetProdDB(name string, secondaryPath string, grp *stop.Group) (*ReadOnlyDBColumnFamily, error) {
prefixNames := prefixes.GetPrefixes() prefixNames := prefixes.GetPrefixes()
// additional prefixes that aren't in the code explicitly // additional prefixes that aren't in the code explicitly
cfNames := []string{"default", "e", "d", "c"} cfNames := []string{"default", "e", "d", "c"}
@ -555,7 +540,7 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func
cfNames = append(cfNames, cfName) cfNames = append(cfNames, cfName)
} }
db, err := GetDBColumnFamilies(name, secondaryPath, cfNames) db, err := GetDBColumnFamilies(name, secondaryPath, cfNames, grp)
cleanupFiles := func() { cleanupFiles := func() {
err = os.RemoveAll(secondaryPath) err = os.RemoveAll(secondaryPath)
@ -565,7 +550,8 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func
} }
if err != nil { if err != nil {
return nil, cleanupFiles, err cleanupFiles()
return nil, err
} }
cleanupDB := func() { cleanupDB := func() {
@ -574,11 +560,11 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func
} }
db.Cleanup = cleanupDB db.Cleanup = cleanupDB
return db, cleanupDB, nil return db, nil
} }
// GetDBColumnFamilies gets a db with the specified column families and secondary path. // GetDBColumnFamilies gets a db with the specified column families and secondary path.
func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*ReadOnlyDBColumnFamily, error) { func GetDBColumnFamilies(name string, secondayPath string, cfNames []string, grp *stop.Group) (*ReadOnlyDBColumnFamily, error) {
opts := grocksdb.NewDefaultOptions() opts := grocksdb.NewDefaultOptions()
roOpts := grocksdb.NewDefaultReadOptions() roOpts := grocksdb.NewDefaultReadOptions()
cfOpt := grocksdb.NewDefaultOptions() cfOpt := grocksdb.NewDefaultOptions()
@ -614,11 +600,7 @@ func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*R
LastState: nil, LastState: nil,
Height: 0, Height: 0,
Headers: nil, Headers: nil,
OpenIterators: make(map[string][]chan struct{}), Grp: grp,
ItMut: sync.RWMutex{},
ShutdownChan: make(chan struct{}, 1),
ShutdownCalled: false,
DoneChan: make(chan struct{}, 1),
} }
err = myDB.ReadDBState() //TODO: Figure out right place for this err = myDB.ReadDBState() //TODO: Figure out right place for this
@ -687,24 +669,7 @@ func (db *ReadOnlyDBColumnFamily) Unwind() {
// Shutdown shuts down the db. // Shutdown shuts down the db.
func (db *ReadOnlyDBColumnFamily) Shutdown() { func (db *ReadOnlyDBColumnFamily) Shutdown() {
log.Println("Sending message to ShutdownChan...") db.Grp.StopAndWait()
db.ShutdownChan <- struct{}{}
log.Println("Locking iterator mutex...")
db.ItMut.Lock()
log.Println("Setting ShutdownCalled to true...")
db.ShutdownCalled = true
log.Println("Notifying iterators to shutdown...")
for _, it := range db.OpenIterators {
it[1] <- struct{}{}
}
log.Println("Waiting for iterators to shutdown...")
for _, it := range db.OpenIterators {
<-it[0]
}
log.Println("Unlocking iterator mutex...")
db.ItMut.Unlock()
log.Println("Sending message to DoneChan...")
<-db.DoneChan
log.Println("Calling cleanup...") log.Println("Calling cleanup...")
db.Cleanup() db.Cleanup()
log.Println("Leaving Shutdown...") log.Println("Leaving Shutdown...")
@ -714,6 +679,7 @@ func (db *ReadOnlyDBColumnFamily) Shutdown() {
// to keep the db readonly view up to date and handle reorgs on the // to keep the db readonly view up to date and handle reorgs on the
// blockchain. // blockchain.
func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) { func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) {
db.Grp.Add(1)
go func() { go func() {
lastPrint := time.Now() lastPrint := time.Now()
for { for {
@ -727,8 +693,8 @@ func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) {
log.Infof("Error detecting changes: %#v", err) log.Infof("Error detecting changes: %#v", err)
} }
select { select {
case <-db.ShutdownChan: case <-db.Grp.Ch():
db.DoneChan <- struct{}{} db.Grp.Done()
return return
case <-time.After(time.Millisecond * 10): case <-time.After(time.Millisecond * 10):
} }

View file

@ -7,12 +7,12 @@ import (
"log" "log"
"os" "os"
"strings" "strings"
"sync"
"testing" "testing"
dbpkg "github.com/lbryio/herald.go/db" dbpkg "github.com/lbryio/herald.go/db"
"github.com/lbryio/herald.go/db/prefixes" "github.com/lbryio/herald.go/db/prefixes"
"github.com/lbryio/herald.go/internal" "github.com/lbryio/herald.go/internal"
"github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/linxGnu/grocksdb" "github.com/linxGnu/grocksdb"
) )
@ -21,7 +21,7 @@ import (
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// OpenAndFillTmpDBColumnFamlies opens a db and fills it with data from a csv file using the given column family names // OpenAndFillTmpDBColumnFamlies opens a db and fills it with data from a csv file using the given column family names
func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFamily, [][]string, func(), error) { func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFamily, [][]string, error) {
log.Println(filePath) log.Println(filePath)
file, err := os.Open(filePath) file, err := os.Open(filePath)
@ -31,7 +31,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
reader := csv.NewReader(file) reader := csv.NewReader(file)
records, err := reader.ReadAll() records, err := reader.ReadAll()
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, err
} }
wOpts := grocksdb.NewDefaultWriteOptions() wOpts := grocksdb.NewDefaultWriteOptions()
@ -39,7 +39,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
opts.SetCreateIfMissing(true) opts.SetCreateIfMissing(true)
db, err := grocksdb.OpenDb(opts, "tmp") db, err := grocksdb.OpenDb(opts, "tmp")
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, err
} }
var handleMap map[string]*grocksdb.ColumnFamilyHandle = make(map[string]*grocksdb.ColumnFamilyHandle) var handleMap map[string]*grocksdb.ColumnFamilyHandle = make(map[string]*grocksdb.ColumnFamilyHandle)
@ -54,7 +54,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
log.Println(cfName) log.Println(cfName)
handle, err := db.CreateColumnFamily(opts, cfName) handle, err := db.CreateColumnFamily(opts, cfName)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, err
} }
handleMap[cfName] = handle handleMap[cfName] = handle
} }
@ -68,16 +68,16 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
for _, record := range records[1:] { for _, record := range records[1:] {
cf := record[0] cf := record[0]
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, err
} }
handle := handleMap[string(cf)] handle := handleMap[string(cf)]
key, err := hex.DecodeString(record[1]) key, err := hex.DecodeString(record[1])
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, err
} }
val, err := hex.DecodeString(record[2]) val, err := hex.DecodeString(record[2])
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, err
} }
db.PutCF(wOpts, handle, key, val) db.PutCF(wOpts, handle, key, val)
} }
@ -94,11 +94,8 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
LastState: nil, LastState: nil,
Height: 0, Height: 0,
Headers: nil, Headers: nil,
OpenIterators: make(map[string][]chan struct{}), Grp: stop.New(),
ItMut: sync.RWMutex{}, Cleanup: toDefer,
ShutdownChan: make(chan struct{}, 1),
DoneChan: make(chan struct{}, 1),
ShutdownCalled: false,
} }
// err = dbpkg.ReadDBState(myDB) //TODO: Figure out right place for this // err = dbpkg.ReadDBState(myDB) //TODO: Figure out right place for this
@ -108,7 +105,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
err = myDB.InitTxCounts() err = myDB.InitTxCounts()
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, err
} }
// err = dbpkg.InitHeaders(myDB) // err = dbpkg.InitHeaders(myDB)
@ -116,7 +113,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
// return nil, nil, nil, err // return nil, nil, nil, err
// } // }
return myDB, records, toDefer, nil return myDB, records, nil
} }
// OpenAndFillTmpDBCF opens a db and fills it with data from a csv file // OpenAndFillTmpDBCF opens a db and fills it with data from a csv file
@ -247,17 +244,18 @@ func CatCSV(filePath string) {
func TestCatFullDB(t *testing.T) { func TestCatFullDB(t *testing.T) {
t.Skip("Skipping full db test") t.Skip("Skipping full db test")
grp := stop.New()
// url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c" // url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c"
// "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1" // "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1"
// url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9" // url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
// url := "lbry://@lbry" // url := "lbry://@lbry"
// url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a" // url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a"
dbPath := "/mnt/sda/wallet_server/_data/lbry-rocksdb/" dbPath := "/mnt/sda1/wallet_server/_data/lbry-rocksdb/"
// dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
secondaryPath := "asdf" secondaryPath := "asdf"
db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath) db, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp)
defer db.Shutdown()
defer toDefer()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
@ -277,6 +275,7 @@ func TestCatFullDB(t *testing.T) {
// TestOpenFullDB Tests running a resolve on a full db. // TestOpenFullDB Tests running a resolve on a full db.
func TestOpenFullDB(t *testing.T) { func TestOpenFullDB(t *testing.T) {
t.Skip("Skipping full db test") t.Skip("Skipping full db test")
grp := stop.New()
// url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c" // url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c"
// "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1" // "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1"
// url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9" // url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
@ -284,11 +283,11 @@ func TestOpenFullDB(t *testing.T) {
// url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a" // url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a"
// url := "lbry://@lbry$1" // url := "lbry://@lbry$1"
url := "https://lbry.tv/@lothrop:2/lothrop-livestream-games-and-code:c" url := "https://lbry.tv/@lothrop:2/lothrop-livestream-games-and-code:c"
dbPath := "/mnt/sda/wallet_server/_data/lbry-rocksdb/" dbPath := "/mnt/sda1/wallet_server/_data/lbry-rocksdb/"
// dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
secondaryPath := "asdf" secondaryPath := "asdf"
db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath) db, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp)
defer toDefer() defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
@ -302,12 +301,13 @@ func TestOpenFullDB(t *testing.T) {
func TestResolve(t *testing.T) { func TestResolve(t *testing.T) {
url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9" url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
filePath := "../testdata/FULL_resolve.csv" filePath := "../testdata/FULL_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
moodyjon commented 2022-10-19 16:18:32 +02:00 (Migrated from github.com)
Review

Have OpenAndFillTmpDBColumnFamilies() assign db.Cleanup = toDefer inside to simplify this.

...Ditto all the other places calling OpenAndFillTmpDBColumnFamilies().

Have `OpenAndFillTmpDBColumnFamilies()` assign `db.Cleanup = toDefer` inside to simplify this. ...Ditto all the other places calling `OpenAndFillTmpDBColumnFamilies()`.
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
} }
defer toDefer()
expandedResolveResult := db.Resolve(url) expandedResolveResult := db.Resolve(url)
log.Printf("%#v\n", expandedResolveResult) log.Printf("%#v\n", expandedResolveResult)
if expandedResolveResult != nil && expandedResolveResult.Channel != nil { if expandedResolveResult != nil && expandedResolveResult.Channel != nil {
@ -321,11 +321,11 @@ func TestResolve(t *testing.T) {
func TestGetDBState(t *testing.T) { func TestGetDBState(t *testing.T) {
filePath := "../testdata/s_resolve.csv" filePath := "../testdata/s_resolve.csv"
want := uint32(1072108) want := uint32(1072108)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
defer toDefer()
state, err := db.GetDBState() state, err := db.GetDBState()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -343,11 +343,11 @@ func TestGetRepostedClaim(t *testing.T) {
// Should be non-existent // Should be non-existent
channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf") channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf")
filePath := "../testdata/W_resolve.csv" filePath := "../testdata/W_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
defer toDefer()
count, err := db.GetRepostedCount(channelHash) count, err := db.GetRepostedCount(channelHash)
if err != nil { if err != nil {
@ -376,11 +376,11 @@ func TestGetRepostedCount(t *testing.T) {
// Should be non-existent // Should be non-existent
channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf") channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf")
filePath := "../testdata/j_resolve.csv" filePath := "../testdata/j_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
defer toDefer()
count, err := db.GetRepostedCount(channelHash) count, err := db.GetRepostedCount(channelHash)
if err != nil { if err != nil {
@ -413,11 +413,11 @@ func TestGetRepost(t *testing.T) {
channelHash2, _ := hex.DecodeString("000009ca6e0caaaef16872b4bd4f6f1b8c2363e2") channelHash2, _ := hex.DecodeString("000009ca6e0caaaef16872b4bd4f6f1b8c2363e2")
filePath := "../testdata/V_resolve.csv" filePath := "../testdata/V_resolve.csv"
// want := uint32(3670) // want := uint32(3670)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
defer toDefer()
res, err := db.GetRepost(channelHash) res, err := db.GetRepost(channelHash)
if err != nil { if err != nil {
@ -447,11 +447,12 @@ func TestGetClaimsInChannelCount(t *testing.T) {
channelHash, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bd") channelHash, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bd")
filePath := "../testdata/Z_resolve.csv" filePath := "../testdata/Z_resolve.csv"
want := uint32(3670) want := uint32(3670)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
defer toDefer()
count, err := db.GetClaimsInChannelCount(channelHash) count, err := db.GetClaimsInChannelCount(channelHash)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -476,11 +477,12 @@ func TestGetShortClaimIdUrl(t *testing.T) {
var position uint16 = 0 var position uint16 = 0
filePath := "../testdata/F_resolve.csv" filePath := "../testdata/F_resolve.csv"
log.Println(filePath) log.Println(filePath)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
defer toDefer()
shortUrl, err := db.GetShortClaimIdUrl(name, normalName, claimHash, rootTxNum, position) shortUrl, err := db.GetShortClaimIdUrl(name, normalName, claimHash, rootTxNum, position)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -494,11 +496,11 @@ func TestClaimShortIdIter(t *testing.T) {
filePath := "../testdata/F_cat.csv" filePath := "../testdata/F_cat.csv"
normalName := "cat" normalName := "cat"
claimId := "0" claimId := "0"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
defer toDefer()
ch := db.ClaimShortIdIter(normalName, claimId) ch := db.ClaimShortIdIter(normalName, claimId)
@ -524,11 +526,12 @@ func TestGetTXOToClaim(t *testing.T) {
var txNum uint32 = 1456296 var txNum uint32 = 1456296
var position uint16 = 0 var position uint16 = 0
filePath := "../testdata/G_2.csv" filePath := "../testdata/G_2.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
defer toDefer()
val, err := db.GetCachedClaimHash(txNum, position) val, err := db.GetCachedClaimHash(txNum, position)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -552,11 +555,11 @@ func TestGetClaimToChannel(t *testing.T) {
var val []byte = nil var val []byte = nil
filePath := "../testdata/I_resolve.csv" filePath := "../testdata/I_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
defer toDefer()
val, err = db.GetChannelForClaim(claimHash, txNum, position) val, err = db.GetChannelForClaim(claimHash, txNum, position)
if err != nil { if err != nil {
@ -581,11 +584,11 @@ func TestGetEffectiveAmountSupportOnly(t *testing.T) {
want := uint64(20000006) want := uint64(20000006)
claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a" claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a"
claimHash, _ := hex.DecodeString(claimHashStr) claimHash, _ := hex.DecodeString(claimHashStr)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
defer toDefer()
db.Height = 999999999 db.Height = 999999999
amount, err := db.GetEffectiveAmount(claimHash, true) amount, err := db.GetEffectiveAmount(claimHash, true)
@ -611,11 +614,12 @@ func TestGetEffectiveAmount(t *testing.T) {
want := uint64(21000006) want := uint64(21000006)
claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a" claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a"
claimHash, _ := hex.DecodeString(claimHashStr) claimHash, _ := hex.DecodeString(claimHashStr)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
defer toDefer()
db.Height = 999999999 db.Height = 999999999
amount, err := db.GetEffectiveAmount(claimHash, false) amount, err := db.GetEffectiveAmount(claimHash, false)
@ -648,11 +652,12 @@ func TestGetSupportAmount(t *testing.T) {
t.Error(err) t.Error(err)
} }
filePath := "../testdata/a_resolve.csv" filePath := "../testdata/a_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
defer toDefer()
res, err := db.GetSupportAmount(claimHash) res, err := db.GetSupportAmount(claimHash)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -668,11 +673,12 @@ func TestGetTxHash(t *testing.T) {
want := "54e14ff0c404c29b3d39ae4d249435f167d5cd4ce5a428ecb745b3df1c8e3dde" want := "54e14ff0c404c29b3d39ae4d249435f167d5cd4ce5a428ecb745b3df1c8e3dde"
filePath := "../testdata/X_resolve.csv" filePath := "../testdata/X_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
defer toDefer()
resHash, err := db.GetTxHash(txNum) resHash, err := db.GetTxHash(txNum)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -710,11 +716,12 @@ func TestGetActivation(t *testing.T) {
txNum := uint32(0x6284e3) txNum := uint32(0x6284e3)
position := uint16(0x0) position := uint16(0x0)
want := uint32(0xa6b65) want := uint32(0xa6b65)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
defer toDefer()
activation, err := db.GetActivation(txNum, position) activation, err := db.GetActivation(txNum, position)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -741,12 +748,13 @@ func TestGetClaimToTXO(t *testing.T) {
return return
} }
filePath := "../testdata/E_resolve.csv" filePath := "../testdata/E_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
} }
defer toDefer()
res, err := db.GetCachedClaimTxo(claimHash, true) res, err := db.GetCachedClaimTxo(claimHash, true)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -770,12 +778,13 @@ func TestGetControllingClaim(t *testing.T) {
claimName := internal.NormalizeName("@Styxhexenhammer666") claimName := internal.NormalizeName("@Styxhexenhammer666")
claimHash := "2556ed1cab9d17f2a9392030a9ad7f5d138f11bd" claimHash := "2556ed1cab9d17f2a9392030a9ad7f5d138f11bd"
filePath := "../testdata/P_resolve.csv" filePath := "../testdata/P_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
} }
defer toDefer()
res, err := db.GetControllingClaim(claimName) res, err := db.GetControllingClaim(claimName)
if err != nil { if err != nil {
t.Error(err) t.Error(err)

View file

@ -6,6 +6,7 @@ import (
"bytes" "bytes"
"github.com/lbryio/herald.go/db/prefixes" "github.com/lbryio/herald.go/db/prefixes"
"github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/linxGnu/grocksdb" "github.com/linxGnu/grocksdb"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -22,12 +23,11 @@ type IterOptions struct {
IncludeValue bool IncludeValue bool
RawKey bool RawKey bool
RawValue bool RawValue bool
ShutdownChan chan struct{} Grp *stop.Group
DoneChan chan struct{} // DB *ReadOnlyDBColumnFamily
DB *ReadOnlyDBColumnFamily CfHandle *grocksdb.ColumnFamilyHandle
CfHandle *grocksdb.ColumnFamilyHandle It *grocksdb.Iterator
It *grocksdb.Iterator Serializer *prefixes.SerializationAPI
Serializer *prefixes.SerializationAPI
} }
// NewIterateOptions creates a defualt options structure for a db iterator. // NewIterateOptions creates a defualt options structure for a db iterator.
@ -43,12 +43,11 @@ func NewIterateOptions() *IterOptions {
IncludeValue: false, IncludeValue: false,
RawKey: false, RawKey: false,
RawValue: false, RawValue: false,
ShutdownChan: make(chan struct{}, 1), Grp: nil,
DoneChan: make(chan struct{}, 1), // DB: nil,
DB: nil, CfHandle: nil,
CfHandle: nil, It: nil,
It: nil, Serializer: prefixes.ProductionAPI,
Serializer: prefixes.ProductionAPI,
} }
} }
@ -108,7 +107,9 @@ func (o *IterOptions) WithRawValue(rawValue bool) *IterOptions {
} }
func (o *IterOptions) WithDB(db *ReadOnlyDBColumnFamily) *IterOptions { func (o *IterOptions) WithDB(db *ReadOnlyDBColumnFamily) *IterOptions {
o.DB = db // o.Grp.AddNamed(1, iterKey)
o.Grp = stop.New(db.Grp)
o.Grp.Add(1)
return o return o
} }

5
go.mod
View file

@ -24,9 +24,6 @@ require (
gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c
) )
require (
)
require ( require (
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
@ -43,7 +40,7 @@ require (
github.com/prometheus/procfs v0.6.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect
github.com/stretchr/testify v1.7.0 // indirect github.com/stretchr/testify v1.7.0 // indirect
golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b // indirect golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b // indirect
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84 // indirect google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84 // indirect

4
go.sum
View file

@ -358,8 +358,6 @@ github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL
github.com/lbryio/lbcd v0.22.100-beta/go.mod h1:u8SaFX4xdGMMR5xasBGfgApC8pvD4rnK2OujZnrq5gs= github.com/lbryio/lbcd v0.22.100-beta/go.mod h1:u8SaFX4xdGMMR5xasBGfgApC8pvD4rnK2OujZnrq5gs=
github.com/lbryio/lbcd v0.22.100-beta-rc5/go.mod h1:9PbFSlHYX7WlnDQwcTxHVf1W35VAnRsattCSyKOO55g= github.com/lbryio/lbcd v0.22.100-beta-rc5/go.mod h1:9PbFSlHYX7WlnDQwcTxHVf1W35VAnRsattCSyKOO55g=
github.com/lbryio/lbcd v0.22.200-beta/go.mod h1:kNuzGWf808ipTGB0y0WogzsGv5BVM4Qv85Z+JYwC9FA= github.com/lbryio/lbcd v0.22.200-beta/go.mod h1:kNuzGWf808ipTGB0y0WogzsGv5BVM4Qv85Z+JYwC9FA=
github.com/lbryio/lbcd v0.22.201-beta-rc1 h1:FmzzApVj2RBXloLM2w9tLvN2xyTZjeyh+QC7GIw/wwo=
github.com/lbryio/lbcd v0.22.201-beta-rc1/go.mod h1:kNuzGWf808ipTGB0y0WogzsGv5BVM4Qv85Z+JYwC9FA=
github.com/lbryio/lbcd v0.22.201-beta-rc4 h1:Xh751Bh/GWRcP5bI6NJ2+zueo2otTcTWapFvFbryP5c= github.com/lbryio/lbcd v0.22.201-beta-rc4 h1:Xh751Bh/GWRcP5bI6NJ2+zueo2otTcTWapFvFbryP5c=
github.com/lbryio/lbcd v0.22.201-beta-rc4/go.mod h1:Jgo48JDINhdOgHHR83J70Q6G42x3WAo9DI//QogcL+E= github.com/lbryio/lbcd v0.22.201-beta-rc4/go.mod h1:Jgo48JDINhdOgHHR83J70Q6G42x3WAo9DI//QogcL+E=
github.com/lbryio/lbcutil v1.0.201/go.mod h1:gDHc/b+Rdz3J7+VB8e5/Bl9roVf8Q5/8FQCyuK9dXD0= github.com/lbryio/lbcutil v1.0.201/go.mod h1:gDHc/b+Rdz3J7+VB8e5/Bl9roVf8Q5/8FQCyuK9dXD0=
@ -375,8 +373,6 @@ github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-b
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/linxGnu/grocksdb v1.6.42 h1:nJLoXFuzwBwQQQrXTUgRGRz1QRm7y8pR6CNV/gwrbqs= github.com/linxGnu/grocksdb v1.6.42 h1:nJLoXFuzwBwQQQrXTUgRGRz1QRm7y8pR6CNV/gwrbqs=
github.com/linxGnu/grocksdb v1.6.42/go.mod h1:JcMMDBFaDNhRXFYcYXmgQwb/RarSld1PulTI7UzE+w0= github.com/linxGnu/grocksdb v1.6.42/go.mod h1:JcMMDBFaDNhRXFYcYXmgQwb/RarSld1PulTI7UzE+w0=
github.com/linxGnu/grocksdb v1.7.0 h1:UyFDykX0CUfxDN10cqlFho/rwt9K6KoDaLXL9Ej5z9g=
github.com/linxGnu/grocksdb v1.7.0/go.mod h1:JcMMDBFaDNhRXFYcYXmgQwb/RarSld1PulTI7UzE+w0=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
github.com/lyoshenka/bencode v0.0.0-20180323155644-b7abd7672df5/go.mod h1:H0aPCWffGOaDcjkw1iB7W9DVLp6GXmfcJY/7YZCWPA4= github.com/lyoshenka/bencode v0.0.0-20180323155644-b7abd7672df5/go.mod h1:H0aPCWffGOaDcjkw1iB7W9DVLp6GXmfcJY/7YZCWPA4=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=

12
main.go
View file

@ -10,6 +10,7 @@ import (
"github.com/lbryio/herald.go/internal" "github.com/lbryio/herald.go/internal"
pb "github.com/lbryio/herald.go/protobuf/go" pb "github.com/lbryio/herald.go/protobuf/go"
"github.com/lbryio/herald.go/server" "github.com/lbryio/herald.go/server"
"github.com/lbryio/lbry.go/v3/extras/stop"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -27,13 +28,16 @@ func main() {
if args.CmdType == server.ServeCmd { if args.CmdType == server.ServeCmd {
// This will cancel goroutines with the server finishes. // This will cancel goroutines with the server finishes.
ctxWCancel, cancel := context.WithCancel(ctx) // ctxWCancel, cancel := context.WithCancel(ctx)
defer cancel() // defer cancel()
stopGroup := stop.New()
// defer stopGroup.Stop()
initsignals() initsignals(stopGroup.Ch())
interrupt := interruptListener() interrupt := interruptListener()
s := server.MakeHubServer(ctxWCancel, args) // s := server.MakeHubServer(ctxWCancel, args)
s := server.MakeHubServer(stopGroup, args)
go s.Run() go s.Run()
defer func() { defer func() {

View file

@ -88,7 +88,7 @@ test_command_with_want
### blockchain.block.get_chunk ### blockchain.block.get_chunk
read -r -d '' CMD <<- EOM read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.block.get_chunk", "params": [0]}' --data '{"id": 1, "method": "blockchain.block.get_chunk", "params": [0]}'
| jq .result | sed 's/"//g' | head -c 100 | jq .result | sed 's/"//g' | head -c 100
EOM EOM
@ -97,7 +97,7 @@ test_command_with_want
### blockchain.block.get_header ### blockchain.block.get_header
read -r -d '' CMD <<- EOM read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.block.get_header", "params": []}' --data '{"id": 1, "method": "blockchain.block.get_header", "params": []}'
| jq .result.timestamp | jq .result.timestamp
EOM EOM
@ -106,7 +106,7 @@ test_command_with_want
### blockchain.block.headers ### blockchain.block.headers
read -r -d '' CMD <<- EOM read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.block.headers", "params": []}' --data '{"id": 1, "method": "blockchain.block.headers", "params": []}'
| jq .result.count | jq .result.count
EOM EOM
@ -116,7 +116,7 @@ test_command_with_want
## blockchain.claimtrie ## blockchain.claimtrie
read -r -d '' CMD <<- EOM read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.claimtrie.resolve", "params":[{"Data": ["@Styxhexenhammer666:2"]}]}' --data '{"id": 1, "method": "blockchain.claimtrie.resolve", "params":[{"Data": ["@Styxhexenhammer666:2"]}]}'
| jq .result.txos[0].tx_hash | sed 's/"//g' | jq .result.txos[0].tx_hash | sed 's/"//g'
EOM EOM
@ -128,7 +128,7 @@ test_command_with_want
### blockchain.address.get_balance ### blockchain.address.get_balance
read -r -d '' CMD <<- EOM read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.address.get_balance", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}' --data '{"id": 1, "method": "blockchain.address.get_balance", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
| jq .result.confirmed | jq .result.confirmed
EOM EOM
@ -138,7 +138,7 @@ test_command_with_want
## blockchain.address.get_history ## blockchain.address.get_history
read -r -d '' CMD <<- EOM read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.address.get_history", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}' --data '{"id": 1, "method": "blockchain.address.get_history", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
| jq '.result.confirmed | length' | jq '.result.confirmed | length'
EOM EOM
@ -148,7 +148,7 @@ test_command_with_want
## blockchain.address.listunspent ## blockchain.address.listunspent
read -r -d '' CMD <<- EOM read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.address.listunspent", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}' --data '{"id": 1, "method": "blockchain.address.listunspent", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
| jq '.result | length' | jq '.result | length'
EOM EOM
@ -160,7 +160,7 @@ test_command_with_want
## blockchain.scripthash.get_mempool ## blockchain.scripthash.get_mempool
read -r -d '' CMD <<- EOM read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.scripthash.get_mempool", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}' --data '{"id": 1, "method": "blockchain.scripthash.get_mempool", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
| jq .error | sed 's/"//g' | jq .error | sed 's/"//g'
EOM EOM
@ -170,7 +170,7 @@ test_command_with_want
## blockchain.scripthash.get_history ## blockchain.scripthash.get_history
read -r -d '' CMD <<- EOM read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.scripthash.get_history", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}' --data '{"id": 1, "method": "blockchain.scripthash.get_history", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
| jq .error | sed 's/"//g' | jq .error | sed 's/"//g'
EOM EOM
@ -180,13 +180,42 @@ test_command_with_want
## blockchain.scripthash.listunspent ## blockchain.scripthash.listunspent
read -r -d '' CMD <<- EOM read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.scripthash.listunspent", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}' --data '{"id": 1, "method": "blockchain.scripthash.listunspent", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
| jq .error | sed 's/"//g' | jq .error | sed 's/"//g'
EOM EOM
WANT="encoding/hex: invalid byte: U+0047 'G'" WANT="encoding/hex: invalid byte: U+0047 'G'"
test_command_with_want test_command_with_want
## server.banner
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "server.banner", "params":[]}'
| jq .result | sed 's/"//g'
EOM
WANT="You are connected to an 0.107.0 server."
test_command_with_want
## server.version
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "server.version", "params":[]}'
| jq .result | sed 's/"//g'
EOM
WANT="0.107.0"
test_command_with_want
## server.features
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "server.features", "params":[]}'
EOM
WANT='{"result":{"hosts":{},"pruning":"","server_version":"0.107.0","protocol_min":"0.54.0","protocol_max":"0.199.0","genesis_hash":"9c89283ba0f3227f6c03b70216b9f665f0118d5e0fa729cedf4fb34d6a34f463","description":"Herald","payment_address":"","donation_address":"","daily_fee":"1.0","hash_function":"sha256","trending_algorithm":"fast_ar"},"error":null,"id":1}'
test_command_with_want
# metrics endpoint testing # metrics endpoint testing
WANT=0 WANT=0

View file

@ -1,6 +1,7 @@
package server package server
import ( import (
"fmt"
"log" "log"
"os" "os"
"strconv" "strconv"
@ -19,26 +20,37 @@ const (
// Args struct contains the arguments to the hub server. // Args struct contains the arguments to the hub server.
type Args struct { type Args struct {
CmdType int CmdType int
Host string Host string
Port string Port string
DBPath string DBPath string
Chain *string Chain *string
EsHost string EsHost string
EsPort string EsPort string
PrometheusPort string PrometheusPort string
NotifierPort string NotifierPort string
JSONRPCPort int JSONRPCPort int
JSONRPCHTTPPort int JSONRPCHTTPPort int
MaxSessions int MaxSessions int
SessionTimeout int SessionTimeout int
EsIndex string EsIndex string
RefreshDelta int RefreshDelta int
CacheTTL int CacheTTL int
PeerFile string PeerFile string
Country string Banner *string
BlockingChannelIds []string Country string
FilteringChannelIds []string BlockingChannelIds []string
FilteringChannelIds []string
GenesisHash string
ServerVersion string
ProtocolMin string
ProtocolMax string
ServerDescription string
PaymentAddress string
DonationAddress string
DailyFee string
Debug bool Debug bool
DisableEs bool DisableEs bool
DisableLoadPeers bool DisableLoadPeers bool
@ -54,21 +66,32 @@ type Args struct {
} }
const ( const (
DefaultHost = "0.0.0.0" DefaultHost = "0.0.0.0"
DefaultPort = "50051" DefaultPort = "50051"
DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME
DefaultEsHost = "http://localhost" DefaultEsHost = "http://localhost"
DefaultEsIndex = "claims" DefaultEsIndex = "claims"
DefaultEsPort = "9200" DefaultEsPort = "9200"
DefaultPrometheusPort = "2112" DefaultPrometheusPort = "2112"
DefaultNotifierPort = "18080" DefaultNotifierPort = "18080"
DefaultJSONRPCPort = 50001 DefaultJSONRPCPort = 50001
DefaultMaxSessions = 10000 DefaultJSONRPCHTTPPort = 50002
DefaultSessionTimeout = 300 DefaultMaxSessions = 10000
DefaultRefreshDelta = 5 DefaultSessionTimeout = 300
DefaultCacheTTL = 5 DefaultRefreshDelta = 5
DefaultPeerFile = "peers.txt" DefaultCacheTTL = 5
DefaultCountry = "US" DefaultPeerFile = "peers.txt"
DefaultBannerFile = ""
DefaultCountry = "US"
HUB_PROTOCOL_VERSION = "0.107.0"
PROTOCOL_MIN = "0.54.0"
PROTOCOL_MAX = "0.199.0"
DefaultServerDescription = "Herald"
DefaultPaymentAddress = ""
DefaultDonationAddress = ""
DefaultDailyFee = "1.0"
DefaultDisableLoadPeers = false DefaultDisableLoadPeers = false
DefaultDisableStartPrometheus = false DefaultDisableStartPrometheus = false
DefaultDisableStartUDP = false DefaultDisableStartUDP = false
@ -86,6 +109,73 @@ var (
DefaultFilteringChannelIds = []string{} DefaultFilteringChannelIds = []string{}
) )
func loadBanner(bannerFile *string, serverVersion string) *string {
var banner string
data, err := os.ReadFile(*bannerFile)
if err != nil {
banner = fmt.Sprintf("You are connected to an %s server.", serverVersion)
} else {
banner = string(data)
}
/*
banner := os.Getenv("BANNER")
if banner == "" {
return nil
}
*/
return &banner
}
// MakeDefaultArgs creates a default set of arguments for testing the server.
func MakeDefaultTestArgs() *Args {
args := &Args{
CmdType: ServeCmd,
Host: DefaultHost,
Port: DefaultPort,
DBPath: DefaultDBPath,
EsHost: DefaultEsHost,
EsPort: DefaultEsPort,
PrometheusPort: DefaultPrometheusPort,
NotifierPort: DefaultNotifierPort,
JSONRPCPort: DefaultJSONRPCPort,
JSONRPCHTTPPort: DefaultJSONRPCHTTPPort,
MaxSessions: DefaultMaxSessions,
SessionTimeout: DefaultSessionTimeout,
EsIndex: DefaultEsIndex,
RefreshDelta: DefaultRefreshDelta,
CacheTTL: DefaultCacheTTL,
PeerFile: DefaultPeerFile,
Banner: nil,
Country: DefaultCountry,
GenesisHash: chaincfg.TestNet3Params.GenesisHash.String(),
ServerVersion: HUB_PROTOCOL_VERSION,
ProtocolMin: PROTOCOL_MIN,
ProtocolMax: PROTOCOL_MAX,
ServerDescription: DefaultServerDescription,
PaymentAddress: DefaultPaymentAddress,
DonationAddress: DefaultDonationAddress,
DailyFee: DefaultDailyFee,
DisableEs: true,
Debug: true,
DisableLoadPeers: true,
DisableStartPrometheus: true,
DisableStartUDP: true,
DisableWritePeers: true,
DisableRocksDBRefresh: true,
DisableResolve: true,
DisableBlockingAndFiltering: true,
DisableStartNotifier: true,
DisableStartJSONRPC: true,
}
return args
}
// GetEnvironment takes the environment variables as an array of strings // GetEnvironment takes the environment variables as an array of strings
// and a getkeyval function to turn it into a map. // and a getkeyval function to turn it into a map.
func GetEnvironment(data []string, getkeyval func(item string) (key, val string)) map[string]string { func GetEnvironment(data []string, getkeyval func(item string) (key, val string)) map[string]string {
@ -111,7 +201,7 @@ func GetEnvironmentStandard() map[string]string {
func ParseArgs(searchRequest *pb.SearchRequest) *Args { func ParseArgs(searchRequest *pb.SearchRequest) *Args {
environment := GetEnvironmentStandard() environment := GetEnvironmentStandard()
parser := argparse.NewParser("hub", "hub server and client") parser := argparse.NewParser("herald", "herald server and client")
serveCmd := parser.NewCommand("serve", "start the hub server") serveCmd := parser.NewCommand("serve", "start the hub server")
searchCmd := parser.NewCommand("search", "claim search") searchCmd := parser.NewCommand("search", "claim search")
@ -122,6 +212,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
return err return err
} }
// main server config arguments
host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost}) host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost})
port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort}) port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort})
dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath}) dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath})
@ -131,18 +222,26 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort}) esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort})
prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort}) prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort})
notifierPort := parser.String("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort}) notifierPort := parser.String("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort})
jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort}) jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort, Default: DefaultJSONRPCPort})
jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort}) jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort, Default: DefaultJSONRPCHTTPPort})
maxSessions := parser.Int("", "max-sessions", &argparse.Options{Required: false, Help: "Maximum number of electrum clients that can be connected", Default: DefaultMaxSessions}) maxSessions := parser.Int("", "max-sessions", &argparse.Options{Required: false, Help: "Maximum number of electrum clients that can be connected", Default: DefaultMaxSessions})
sessionTimeout := parser.Int("", "session-timeout", &argparse.Options{Required: false, Help: "Session inactivity timeout (seconds)", Default: DefaultSessionTimeout}) sessionTimeout := parser.Int("", "session-timeout", &argparse.Options{Required: false, Help: "Session inactivity timeout (seconds)", Default: DefaultSessionTimeout})
esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: DefaultEsIndex}) esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: DefaultEsIndex})
refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: DefaultRefreshDelta}) refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: DefaultRefreshDelta})
cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: DefaultCacheTTL}) cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: DefaultCacheTTL})
peerFile := parser.String("", "peerfile", &argparse.Options{Required: false, Help: "Initial peer file for federation", Default: DefaultPeerFile}) peerFile := parser.String("", "peerfile", &argparse.Options{Required: false, Help: "Initial peer file for federation", Default: DefaultPeerFile})
bannerFile := parser.String("", "bannerfile", &argparse.Options{Required: false, Help: "Banner file server.banner", Default: DefaultBannerFile})
country := parser.String("", "country", &argparse.Options{Required: false, Help: "Country this node is running in. Default US.", Default: DefaultCountry}) country := parser.String("", "country", &argparse.Options{Required: false, Help: "Country this node is running in. Default US.", Default: DefaultCountry})
blockingChannelIds := parser.StringList("", "blocking-channel-ids", &argparse.Options{Required: false, Help: "Blocking channel ids", Default: DefaultBlockingChannelIds}) blockingChannelIds := parser.StringList("", "blocking-channel-ids", &argparse.Options{Required: false, Help: "Blocking channel ids", Default: DefaultBlockingChannelIds})
filteringChannelIds := parser.StringList("", "filtering-channel-ids", &argparse.Options{Required: false, Help: "Filtering channel ids", Default: DefaultFilteringChannelIds}) filteringChannelIds := parser.StringList("", "filtering-channel-ids", &argparse.Options{Required: false, Help: "Filtering channel ids", Default: DefaultFilteringChannelIds})
// arguments for server features
serverDescription := parser.String("", "server-description", &argparse.Options{Required: false, Help: "Server description", Default: DefaultServerDescription})
paymentAddress := parser.String("", "payment-address", &argparse.Options{Required: false, Help: "Payment address", Default: DefaultPaymentAddress})
donationAddress := parser.String("", "donation-address", &argparse.Options{Required: false, Help: "Donation address", Default: DefaultDonationAddress})
dailyFee := parser.String("", "daily-fee", &argparse.Options{Required: false, Help: "Daily fee", Default: DefaultDailyFee})
// flags for disabling features
debug := parser.Flag("", "debug", &argparse.Options{Required: false, Help: "enable debug logging", Default: false}) debug := parser.Flag("", "debug", &argparse.Options{Required: false, Help: "enable debug logging", Default: false})
disableEs := parser.Flag("", "disable-es", &argparse.Options{Required: false, Help: "Disable elastic search, for running/testing independently", Default: false}) disableEs := parser.Flag("", "disable-es", &argparse.Options{Required: false, Help: "Disable elastic search, for running/testing independently", Default: false})
disableLoadPeers := parser.Flag("", "disable-load-peers", &argparse.Options{Required: false, Help: "Disable load peers from disk at startup", Default: DefaultDisableLoadPeers}) disableLoadPeers := parser.Flag("", "disable-load-peers", &argparse.Options{Required: false, Help: "Disable load peers from disk at startup", Default: DefaultDisableLoadPeers})
@ -156,6 +255,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
disableStartNotifier := parser.Flag("", "disable-start-notifier", &argparse.Options{Required: false, Help: "Disable start notifier", Default: DisableStartNotifier}) disableStartNotifier := parser.Flag("", "disable-start-notifier", &argparse.Options{Required: false, Help: "Disable start notifier", Default: DisableStartNotifier})
disableStartJSONRPC := parser.Flag("", "disable-start-jsonrpc", &argparse.Options{Required: false, Help: "Disable start jsonrpc endpoint", Default: DisableStartJSONRPC}) disableStartJSONRPC := parser.Flag("", "disable-start-jsonrpc", &argparse.Options{Required: false, Help: "Disable start jsonrpc endpoint", Default: DisableStartJSONRPC})
// search command arguments
text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"}) text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"})
name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"}) name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"})
claimType := parser.String("", "claim_type", &argparse.Options{Required: false, Help: "claim_type"}) claimType := parser.String("", "claim_type", &argparse.Options{Required: false, Help: "claim_type"})
@ -177,27 +277,40 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
*jsonRPCPort = DefaultJSONRPCPort *jsonRPCPort = DefaultJSONRPCPort
} }
banner := loadBanner(bannerFile, HUB_PROTOCOL_VERSION)
args := &Args{ args := &Args{
CmdType: SearchCmd, CmdType: SearchCmd,
Host: *host, Host: *host,
Port: *port, Port: *port,
DBPath: *dbPath, DBPath: *dbPath,
Chain: chain, Chain: chain,
EsHost: *esHost, EsHost: *esHost,
EsPort: *esPort, EsPort: *esPort,
PrometheusPort: *prometheusPort, PrometheusPort: *prometheusPort,
NotifierPort: *notifierPort, NotifierPort: *notifierPort,
JSONRPCPort: *jsonRPCPort, JSONRPCPort: *jsonRPCPort,
JSONRPCHTTPPort: *jsonRPCHTTPPort, JSONRPCHTTPPort: *jsonRPCHTTPPort,
MaxSessions: *maxSessions, MaxSessions: *maxSessions,
SessionTimeout: *sessionTimeout, SessionTimeout: *sessionTimeout,
EsIndex: *esIndex, EsIndex: *esIndex,
RefreshDelta: *refreshDelta, RefreshDelta: *refreshDelta,
CacheTTL: *cacheTTL, CacheTTL: *cacheTTL,
PeerFile: *peerFile, PeerFile: *peerFile,
Country: *country, Banner: banner,
BlockingChannelIds: *blockingChannelIds, Country: *country,
FilteringChannelIds: *filteringChannelIds, BlockingChannelIds: *blockingChannelIds,
FilteringChannelIds: *filteringChannelIds,
GenesisHash: "",
ServerVersion: HUB_PROTOCOL_VERSION,
ProtocolMin: PROTOCOL_MIN,
ProtocolMax: PROTOCOL_MAX,
ServerDescription: *serverDescription,
PaymentAddress: *paymentAddress,
DonationAddress: *donationAddress,
DailyFee: *dailyFee,
Debug: *debug, Debug: *debug,
DisableEs: *disableEs, DisableEs: *disableEs,
DisableLoadPeers: *disableLoadPeers, DisableLoadPeers: *disableLoadPeers,

View file

@ -12,7 +12,8 @@ import (
"github.com/lbryio/herald.go/internal/metrics" "github.com/lbryio/herald.go/internal/metrics"
pb "github.com/lbryio/herald.go/protobuf/go" pb "github.com/lbryio/herald.go/protobuf/go"
server "github.com/lbryio/herald.go/server" "github.com/lbryio/herald.go/server"
"github.com/lbryio/lbry.go/v3/extras/stop"
dto "github.com/prometheus/client_model/go" dto "github.com/prometheus/client_model/go"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -44,43 +45,11 @@ func removeFile(fileName string) {
} }
} }
// makeDefaultArgs creates a default set of arguments for testing the server.
func makeDefaultArgs() *server.Args {
args := &server.Args{
CmdType: server.ServeCmd,
Host: server.DefaultHost,
Port: server.DefaultPort,
DBPath: server.DefaultDBPath,
EsHost: server.DefaultEsHost,
EsPort: server.DefaultEsPort,
PrometheusPort: server.DefaultPrometheusPort,
NotifierPort: server.DefaultNotifierPort,
JSONRPCPort: server.DefaultJSONRPCPort,
EsIndex: server.DefaultEsIndex,
RefreshDelta: server.DefaultRefreshDelta,
CacheTTL: server.DefaultCacheTTL,
PeerFile: server.DefaultPeerFile,
Country: server.DefaultCountry,
DisableEs: true,
Debug: true,
DisableLoadPeers: true,
DisableStartPrometheus: true,
DisableStartUDP: true,
DisableWritePeers: true,
DisableRocksDBRefresh: true,
DisableResolve: true,
DisableBlockingAndFiltering: true,
DisableStartNotifier: true,
DisableStartJSONRPC: true,
}
return args
}
// TestAddPeer tests the ability to add peers // TestAddPeer tests the ability to add peers
func TestAddPeer(t *testing.T) { func TestAddPeer(t *testing.T) {
ctx := context.Background() // ctx := context.Background()
args := makeDefaultArgs() ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs()
tests := []struct { tests := []struct {
name string name string
@ -137,8 +106,9 @@ func TestAddPeer(t *testing.T) {
// TestPeerWriter tests that peers get written properly // TestPeerWriter tests that peers get written properly
func TestPeerWriter(t *testing.T) { func TestPeerWriter(t *testing.T) {
ctx := context.Background() // ctx := context.Background()
args := makeDefaultArgs() ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs()
args.DisableWritePeers = false args.DisableWritePeers = false
tests := []struct { tests := []struct {
@ -193,9 +163,10 @@ func TestPeerWriter(t *testing.T) {
// TestAddPeerEndpoint tests the ability to add peers // TestAddPeerEndpoint tests the ability to add peers
func TestAddPeerEndpoint(t *testing.T) { func TestAddPeerEndpoint(t *testing.T) {
ctx := context.Background() // ctx := context.Background()
args := makeDefaultArgs() ctx := stop.NewDebug()
args2 := makeDefaultArgs() args := server.MakeDefaultTestArgs()
args2 := server.MakeDefaultTestArgs()
args2.Port = "50052" args2.Port = "50052"
tests := []struct { tests := []struct {
@ -264,10 +235,11 @@ func TestAddPeerEndpoint(t *testing.T) {
// TestAddPeerEndpoint2 tests the ability to add peers // TestAddPeerEndpoint2 tests the ability to add peers
func TestAddPeerEndpoint2(t *testing.T) { func TestAddPeerEndpoint2(t *testing.T) {
ctx := context.Background() // ctx := context.Background()
args := makeDefaultArgs() ctx := stop.NewDebug()
args2 := makeDefaultArgs() args := server.MakeDefaultTestArgs()
args3 := makeDefaultArgs() args2 := server.MakeDefaultTestArgs()
args3 := server.MakeDefaultTestArgs()
args2.Port = "50052" args2.Port = "50052"
args3.Port = "50053" args3.Port = "50053"
@ -345,10 +317,11 @@ func TestAddPeerEndpoint2(t *testing.T) {
// TestAddPeerEndpoint3 tests the ability to add peers // TestAddPeerEndpoint3 tests the ability to add peers
func TestAddPeerEndpoint3(t *testing.T) { func TestAddPeerEndpoint3(t *testing.T) {
ctx := context.Background() // ctx := context.Background()
args := makeDefaultArgs() ctx := stop.NewDebug()
args2 := makeDefaultArgs() args := server.MakeDefaultTestArgs()
args3 := makeDefaultArgs() args2 := server.MakeDefaultTestArgs()
args3 := server.MakeDefaultTestArgs()
args2.Port = "50052" args2.Port = "50052"
args3.Port = "50053" args3.Port = "50053"
@ -434,10 +407,11 @@ func TestAddPeerEndpoint3(t *testing.T) {
// TestAddPeer tests the ability to add peers // TestAddPeer tests the ability to add peers
func TestUDPServer(t *testing.T) { func TestUDPServer(t *testing.T) {
ctx := context.Background() // ctx := context.Background()
args := makeDefaultArgs() ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs()
args.DisableStartUDP = false args.DisableStartUDP = false
args2 := makeDefaultArgs() args2 := server.MakeDefaultTestArgs()
args2.Port = "50052" args2.Port = "50052"
args2.DisableStartUDP = false args2.DisableStartUDP = false

View file

@ -105,6 +105,7 @@ func newBlockHeaderElectrum(header *[HEADER_SIZE]byte, height uint32) *BlockHead
type BlockGetServerHeightReq struct{} type BlockGetServerHeightReq struct{}
type BlockGetServerHeightResp uint32 type BlockGetServerHeightResp uint32
// blockchain.block.get_server_height
func (s *BlockchainBlockService) Get_server_height(req *BlockGetServerHeightReq, resp **BlockGetServerHeightResp) error { func (s *BlockchainBlockService) Get_server_height(req *BlockGetServerHeightReq, resp **BlockGetServerHeightResp) error {
if s.DB == nil || s.DB.LastState == nil { if s.DB == nil || s.DB.LastState == nil {
return fmt.Errorf("unknown height") return fmt.Errorf("unknown height")

View file

@ -13,6 +13,7 @@ import (
"github.com/lbryio/lbcd/chaincfg" "github.com/lbryio/lbcd/chaincfg"
"github.com/lbryio/lbcd/txscript" "github.com/lbryio/lbcd/txscript"
"github.com/lbryio/lbcutil" "github.com/lbryio/lbcutil"
"github.com/lbryio/lbry.go/v3/extras/stop"
) )
// Source: test_variety_of_transactions_and_longish_history (lbry-sdk/tests/integration/transactions) // Source: test_variety_of_transactions_and_longish_history (lbry-sdk/tests/integration/transactions)
@ -57,8 +58,9 @@ var regTestAddrs = [30]string{
func TestServerGetHeight(t *testing.T) { func TestServerGetHeight(t *testing.T) {
secondaryPath := "asdf" secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) grp := stop.NewDebug()
defer toDefer() db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
@ -87,8 +89,9 @@ func TestServerGetHeight(t *testing.T) {
func TestGetChunk(t *testing.T) { func TestGetChunk(t *testing.T) {
secondaryPath := "asdf" secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) grp := stop.NewDebug()
defer toDefer() db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
@ -130,8 +133,9 @@ func TestGetChunk(t *testing.T) {
func TestGetHeader(t *testing.T) { func TestGetHeader(t *testing.T) {
secondaryPath := "asdf" secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) grp := stop.NewDebug()
defer toDefer() db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
@ -159,8 +163,9 @@ func TestGetHeader(t *testing.T) {
func TestHeaders(t *testing.T) { func TestHeaders(t *testing.T) {
secondaryPath := "asdf" secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) grp := stop.NewDebug()
defer toDefer() db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
@ -189,15 +194,17 @@ func TestHeaders(t *testing.T) {
} }
func TestHeadersSubscribe(t *testing.T) { func TestHeadersSubscribe(t *testing.T) {
args := MakeDefaultTestArgs()
grp := stop.NewDebug()
secondaryPath := "asdf" secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer toDefer() defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
} }
sm := newSessionManager(db, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout) sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams)
sm.start() sm.start()
defer sm.stop() defer sm.stop()
@ -209,13 +216,13 @@ func TestHeadersSubscribe(t *testing.T) {
// Set up logic to read a notification. // Set up logic to read a notification.
var received sync.WaitGroup var received sync.WaitGroup
recv := func(client net.Conn) { recv := func(client net.Conn) {
defer received.Done()
buf := make([]byte, 1024) buf := make([]byte, 1024)
len, err := client.Read(buf) len, err := client.Read(buf)
if err != nil { if err != nil {
t.Errorf("read err: %v", err) t.Errorf("read err: %v", err)
} }
t.Logf("len: %v notification: %v", len, string(buf)) t.Logf("len: %v notification: %v", len, string(buf))
received.Done()
} }
received.Add(2) received.Add(2)
go recv(client1) go recv(client1)
@ -281,8 +288,9 @@ func TestHeadersSubscribe(t *testing.T) {
func TestGetBalance(t *testing.T) { func TestGetBalance(t *testing.T) {
secondaryPath := "asdf" secondaryPath := "asdf"
moodyjon commented 2022-10-19 16:25:36 +02:00 (Migrated from github.com)
Review

defer db.Shutdown() to match style elsewhere.

`defer db.Shutdown()` to match style elsewhere.
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) grp := stop.NewDebug()
defer toDefer() db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
@ -310,8 +318,9 @@ func TestGetBalance(t *testing.T) {
func TestGetHistory(t *testing.T) { func TestGetHistory(t *testing.T) {
secondaryPath := "asdf" secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) grp := stop.NewDebug()
defer toDefer() db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
@ -339,8 +348,9 @@ func TestGetHistory(t *testing.T) {
func TestListUnspent(t *testing.T) { func TestListUnspent(t *testing.T) {
secondaryPath := "asdf" secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) grp := stop.NewDebug()
defer toDefer() db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
@ -367,15 +377,17 @@ func TestListUnspent(t *testing.T) {
} }
func TestAddressSubscribe(t *testing.T) { func TestAddressSubscribe(t *testing.T) {
args := MakeDefaultTestArgs()
grp := stop.NewDebug()
secondaryPath := "asdf" secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer toDefer() defer db.Shutdown()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
} }
sm := newSessionManager(db, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout) sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams)
sm.start() sm.start()
defer sm.stop() defer sm.stop()

89
server/jsonrpc_server.go Normal file
View file

@ -0,0 +1,89 @@
package server
import (
log "github.com/sirupsen/logrus"
)
type ServerService struct {
Args *Args
}
type ServerFeatureService struct {
Args *Args
}
type ServerFeaturesReq struct{}
type ServerFeaturesRes struct {
Hosts map[string]string `json:"hosts"`
Pruning string `json:"pruning"`
ServerVersion string `json:"server_version"`
ProtocolMin string `json:"protocol_min"`
ProtocolMax string `json:"protocol_max"`
GenesisHash string `json:"genesis_hash"`
Description string `json:"description"`
PaymentAddress string `json:"payment_address"`
DonationAddress string `json:"donation_address"`
DailyFee string `json:"daily_fee"`
HashFunction string `json:"hash_function"`
TrendingAlgorithm string `json:"trending_algorithm"`
}
// Features is the json rpc endpoint for 'server.features'.
func (t *ServerService) Features(req *ServerFeaturesReq, res **ServerFeaturesRes) error {
log.Println("Features")
features := &ServerFeaturesRes{
Hosts: map[string]string{},
Pruning: "",
ServerVersion: HUB_PROTOCOL_VERSION,
ProtocolMin: PROTOCOL_MIN,
ProtocolMax: PROTOCOL_MAX,
GenesisHash: t.Args.GenesisHash,
Description: t.Args.ServerDescription,
PaymentAddress: t.Args.PaymentAddress,
DonationAddress: t.Args.DonationAddress,
DailyFee: t.Args.DailyFee,
HashFunction: "sha256",
TrendingAlgorithm: "fast_ar",
}
*res = features
return nil
}
type ServerBannerService struct {
Args *Args
}
type ServerBannerReq struct{}
type ServerBannerRes string
// Banner is the json rpc endpoint for 'server.banner'.
func (t *ServerService) Banner(req *ServerBannerReq, res **ServerBannerRes) error {
log.Println("Banner")
*res = (*ServerBannerRes)(t.Args.Banner)
return nil
}
type ServerVersionService struct {
Args *Args
}
type ServerVersionReq struct{}
type ServerVersionRes string
// Banner is the json rpc endpoint for 'server.version'.
// FIXME: This should return a struct with the version and the protocol version.
// <<-- that comment was written by github, scary shit because it's true
func (t *ServerService) Version(req *ServerVersionReq, res **ServerVersionRes) error {
log.Println("Version")
*res = (*ServerVersionRes)(&t.Args.ServerVersion)
return nil
}

View file

@ -121,6 +121,14 @@ fail1:
goto fail2 goto fail2
} }
// Register "server.{features,banner,version}" handlers.
serverSvc := &ServerService{s.Args}
err = s1.RegisterTCPService(serverSvc, "server")
if err != nil {
log.Errorf("RegisterTCPService: %v\n", err)
goto fail2
}
r := gorilla_mux.NewRouter() r := gorilla_mux.NewRouter()
r.Handle("/rpc", s1) r.Handle("/rpc", s1)
port := ":" + strconv.FormatUint(uint64(s.Args.JSONRPCHTTPPort), 10) port := ":" + strconv.FormatUint(uint64(s.Args.JSONRPCHTTPPort), 10)

View file

@ -1,7 +1,6 @@
package server_test package server_test
import ( import (
"context"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"net" "net"
@ -10,6 +9,7 @@ import (
"github.com/lbryio/herald.go/internal" "github.com/lbryio/herald.go/internal"
"github.com/lbryio/herald.go/server" "github.com/lbryio/herald.go/server"
"github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -47,8 +47,9 @@ func tcpRead(conn net.Conn) ([]byte, error) {
} }
func TestNotifierServer(t *testing.T) { func TestNotifierServer(t *testing.T) {
args := makeDefaultArgs() args := server.MakeDefaultTestArgs()
ctx := context.Background() // ctx := context.Background()
ctx := stop.NewDebug()
hub := server.MakeHubServer(ctx, args) hub := server.MakeHubServer(ctx, args)
go hub.NotifierServer() go hub.NotifierServer()

View file

@ -11,6 +11,7 @@ import (
pb "github.com/lbryio/herald.go/protobuf/go" pb "github.com/lbryio/herald.go/protobuf/go"
server "github.com/lbryio/herald.go/server" server "github.com/lbryio/herald.go/server"
"github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/olivere/elastic/v7" "github.com/olivere/elastic/v7"
) )
@ -55,13 +56,14 @@ func TestSearch(t *testing.T) {
w.Write([]byte(resp)) w.Write([]byte(resp))
} }
context := context.Background() ctx := context.Background()
args := makeDefaultArgs() stopGroup := stop.NewDebug()
hubServer := server.MakeHubServer(context, args) args := server.MakeDefaultTestArgs()
hubServer := server.MakeHubServer(stopGroup, args)
req := &pb.SearchRequest{ req := &pb.SearchRequest{
Text: "asdf", Text: "asdf",
} }
out, err := hubServer.Search(context, req) out, err := hubServer.Search(ctx, req)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }

View file

@ -22,6 +22,7 @@ import (
"github.com/lbryio/herald.go/meta" "github.com/lbryio/herald.go/meta"
pb "github.com/lbryio/herald.go/protobuf/go" pb "github.com/lbryio/herald.go/protobuf/go"
"github.com/lbryio/lbcd/chaincfg" "github.com/lbryio/lbcd/chaincfg"
"github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/olivere/elastic/v7" "github.com/olivere/elastic/v7"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
@ -53,6 +54,7 @@ type Server struct {
HeightSubs map[net.Addr]net.Conn HeightSubs map[net.Addr]net.Conn
HeightSubsMut sync.RWMutex HeightSubsMut sync.RWMutex
NotifierChan chan interface{} NotifierChan chan interface{}
Grp *stop.Group
sessionManager *sessionManager sessionManager *sessionManager
pb.UnimplementedHubServer pb.UnimplementedHubServer
} }
@ -149,7 +151,7 @@ func (s *Server) Run() {
} }
} }
func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) { func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, error) {
tmpName, err := ioutil.TempDir("", "go-lbry-hub") tmpName, err := ioutil.TempDir("", "go-lbry-hub")
if err != nil { if err != nil {
logrus.Info(err) logrus.Info(err)
@ -159,10 +161,7 @@ func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) {
if err != nil { if err != nil {
logrus.Info(err) logrus.Info(err)
} }
myDB, _, err := db.GetProdDB(args.DBPath, tmpName) myDB, err := db.GetProdDB(args.DBPath, tmpName, grp)
// dbShutdown = func() {
// db.Shutdown(myDB)
// }
if err != nil { if err != nil {
// Can't load the db, fail loudly // Can't load the db, fail loudly
logrus.Info(err) logrus.Info(err)
@ -217,7 +216,7 @@ func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) {
// MakeHubServer takes the arguments given to a hub when it's started and // MakeHubServer takes the arguments given to a hub when it's started and
// initializes everything. It loads information about previously known peers, // initializes everything. It loads information about previously known peers,
// creates needed internal data structures, and initializes goroutines. // creates needed internal data structures, and initializes goroutines.
func MakeHubServer(ctx context.Context, args *Args) *Server { func MakeHubServer(grp *stop.Group, args *Args) *Server {
grpcServer := grpc.NewServer(grpc.NumStreamWorkers(0)) grpcServer := grpc.NewServer(grpc.NumStreamWorkers(0))
multiSpaceRe, err := regexp.Compile(`\s{2,}`) multiSpaceRe, err := regexp.Compile(`\s{2,}`)
@ -266,14 +265,14 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
//TODO: is this the right place to load the db? //TODO: is this the right place to load the db?
var myDB *db.ReadOnlyDBColumnFamily var myDB *db.ReadOnlyDBColumnFamily
// var dbShutdown = func() {}
if !args.DisableResolve { if !args.DisableResolve {
myDB, err = LoadDatabase(args) myDB, err = LoadDatabase(args, grp)
moodyjon commented 2022-10-19 15:57:30 +02:00 (Migrated from github.com)
Review

Not needed.

Not needed.
if err != nil { if err != nil {
logrus.Warning(err) logrus.Warning(err)
} }
} }
moodyjon commented 2022-10-14 16:20:05 +02:00 (Migrated from github.com)
Review

Ahaa. So this is how it gets set. You might consider passing grp down into LoadDatabase(). So it gets initialized ASAP. Also, don't forget the ReadonlyDBColumnFamilies instance constructed in db_test.go.

Ahaa. So this is how it gets set. You might consider passing `grp` down into `LoadDatabase()`. So it gets initialized ASAP. Also, don't forget the `ReadonlyDBColumnFamilies` instance constructed in db_test.go.
// Determine which chain to use based on db and cli values
dbChain := (*chaincfg.Params)(nil) dbChain := (*chaincfg.Params)(nil)
if myDB != nil && myDB.LastState != nil && myDB.LastState.Genesis != nil { if myDB != nil && myDB.LastState != nil && myDB.LastState.Genesis != nil {
// The chain params can be inferred from DBStateValue. // The chain params can be inferred from DBStateValue.
@ -310,6 +309,10 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
} }
logrus.Infof("network: %v", chain.Name) logrus.Infof("network: %v", chain.Name)
args.GenesisHash = chain.GenesisHash.String()
sessionGrp := stop.New(grp)
s := &Server{ s := &Server{
GrpcServer: grpcServer, GrpcServer: grpcServer,
Args: args, Args: args,
@ -333,7 +336,8 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
HeightSubs: make(map[net.Addr]net.Conn), HeightSubs: make(map[net.Addr]net.Conn),
HeightSubsMut: sync.RWMutex{}, HeightSubsMut: sync.RWMutex{},
NotifierChan: make(chan interface{}), NotifierChan: make(chan interface{}),
sessionManager: newSessionManager(myDB, &chain, args.MaxSessions, args.SessionTimeout), Grp: grp,
sessionManager: newSessionManager(myDB, args, sessionGrp, &chain),
} }
// Start up our background services // Start up our background services

View file

@ -1,5 +1,11 @@
package server package server
import (
"github.com/lbryio/herald.go/db"
"github.com/lbryio/lbcd/chaincfg"
"github.com/lbryio/lbry.go/v3/extras/stop"
)
func (s *Server) AddPeerExported() func(*Peer, bool, bool) error { func (s *Server) AddPeerExported() func(*Peer, bool, bool) error {
return s.addPeer return s.addPeer
} }
@ -7,3 +13,7 @@ func (s *Server) AddPeerExported() func(*Peer, bool, bool) error {
func (s *Server) GetNumPeersExported() func() int64 { func (s *Server) GetNumPeersExported() func() int64 {
return s.getNumPeers return s.getNumPeers
} }
func NewSessionManagerExported(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
return newSessionManager(db, args, grp, chain)
}

View file

@ -14,6 +14,7 @@ import (
"github.com/lbryio/herald.go/db" "github.com/lbryio/herald.go/db"
"github.com/lbryio/herald.go/internal" "github.com/lbryio/herald.go/internal"
"github.com/lbryio/lbcd/chaincfg" "github.com/lbryio/lbcd/chaincfg"
"github.com/lbryio/lbry.go/v3/extras/stop"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -114,13 +115,15 @@ type sessionMap map[uintptr]*session
type sessionManager struct { type sessionManager struct {
// sessionsMut protects sessions, headerSubs, hashXSubs state // sessionsMut protects sessions, headerSubs, hashXSubs state
sessionsMut sync.RWMutex sessionsMut sync.RWMutex
sessions sessionMap sessions sessionMap
sessionsWait sync.WaitGroup // sessionsWait sync.WaitGroup
grp *stop.Group
sessionsMax int sessionsMax int
sessionTimeout time.Duration sessionTimeout time.Duration
manageTicker *time.Ticker manageTicker *time.Ticker
db *db.ReadOnlyDBColumnFamily db *db.ReadOnlyDBColumnFamily
args *Args
chain *chaincfg.Params chain *chaincfg.Params
// headerSubs are sessions subscribed via 'blockchain.headers.subscribe' // headerSubs are sessions subscribed via 'blockchain.headers.subscribe'
headerSubs sessionMap headerSubs sessionMap
@ -128,13 +131,15 @@ type sessionManager struct {
hashXSubs map[[HASHX_LEN]byte]sessionMap hashXSubs map[[HASHX_LEN]byte]sessionMap
} }
func newSessionManager(db *db.ReadOnlyDBColumnFamily, chain *chaincfg.Params, sessionsMax, sessionTimeout int) *sessionManager { func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
return &sessionManager{ return &sessionManager{
sessions: make(sessionMap), sessions: make(sessionMap),
sessionsMax: sessionsMax, grp: grp,
sessionTimeout: time.Duration(sessionTimeout) * time.Second, sessionsMax: args.MaxSessions,
manageTicker: time.NewTicker(time.Duration(max(5, sessionTimeout/20)) * time.Second), sessionTimeout: time.Duration(args.SessionTimeout) * time.Second,
manageTicker: time.NewTicker(time.Duration(max(5, args.SessionTimeout/20)) * time.Second),
db: db, db: db,
args: args,
chain: chain, chain: chain,
headerSubs: make(sessionMap), headerSubs: make(sessionMap),
hashXSubs: make(map[[HASHX_LEN]byte]sessionMap), hashXSubs: make(map[[HASHX_LEN]byte]sessionMap),
@ -142,6 +147,7 @@ func newSessionManager(db *db.ReadOnlyDBColumnFamily, chain *chaincfg.Params, se
} }
func (sm *sessionManager) start() { func (sm *sessionManager) start() {
sm.grp.Add(1)
go sm.manage() go sm.manage()
} }
@ -168,7 +174,13 @@ func (sm *sessionManager) manage() {
} }
sm.sessionsMut.Unlock() sm.sessionsMut.Unlock()
// Wait for next management clock tick. // Wait for next management clock tick.
<-sm.manageTicker.C select {
case <-sm.grp.Ch():
sm.grp.Done()
return
case <-sm.manageTicker.C:
continue
}
} }
} }
@ -190,11 +202,18 @@ func (sm *sessionManager) addSession(conn net.Conn) *session {
// each request and update subscriptions. // each request and update subscriptions.
s1 := rpc.NewServer() s1 := rpc.NewServer()
// Register "server.{features,banner,version}" handlers.
serverSvc := &ServerService{sm.args}
err := s1.RegisterName("server", serverSvc)
if err != nil {
log.Errorf("RegisterName: %v\n", err)
}
// Register "blockchain.claimtrie.*"" handlers. // Register "blockchain.claimtrie.*"" handlers.
claimtrieSvc := &ClaimtrieService{sm.db} claimtrieSvc := &ClaimtrieService{sm.db}
err := s1.RegisterName("blockchain.claimtrie", claimtrieSvc) err = s1.RegisterName("blockchain.claimtrie", claimtrieSvc)
if err != nil { if err != nil {
log.Errorf("RegisterService: %v\n", err) log.Errorf("RegisterName: %v\n", err)
} }
// Register other "blockchain.{block,address,scripthash}.*" handlers. // Register other "blockchain.{block,address,scripthash}.*" handlers.
@ -220,11 +239,11 @@ func (sm *sessionManager) addSession(conn net.Conn) *session {
goto fail goto fail
} }
sm.sessionsWait.Add(1) sm.grp.Add(1)
go func() { go func() {
s1.ServeCodec(&SessionServerCodec{jsonrpc.NewServerCodec(conn), sess}) s1.ServeCodec(&SessionServerCodec{jsonrpc.NewServerCodec(conn), sess})
log.Infof("session %v goroutine exit", sess.addr.String()) log.Infof("session %v goroutine exit", sess.addr.String())
sm.sessionsWait.Done() sm.grp.Done()
}() }()
return sess return sess

View file

@ -11,7 +11,7 @@ import (
// TestUDPPing tests UDPPing correctness against prod server. // TestUDPPing tests UDPPing correctness against prod server.
func TestUDPPing(t *testing.T) { func TestUDPPing(t *testing.T) {
args := makeDefaultArgs() args := server.MakeDefaultTestArgs()
args.DisableStartUDP = true args.DisableStartUDP = true
tests := []struct { tests := []struct {

View file

@ -8,12 +8,13 @@ import (
"os" "os"
"os/signal" "os/signal"
"github.com/lbryio/lbry.go/v3/extras/stop"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
// shutdownRequestChannel is used to initiate shutdown from one of the // shutdownRequestChannel is used to initiate shutdown from one of the
// subsystems using the same code paths as when an interrupt signal is received. // subsystems using the same code paths as when an interrupt signal is received.
var shutdownRequestChannel = make(chan struct{}) var shutdownRequestChannel = make(stop.Chan)
// interruptSignals defines the default signals to catch in order to do a proper // interruptSignals defines the default signals to catch in order to do a proper
// shutdown. This may be modified during init depending on the platform. // shutdown. This may be modified during init depending on the platform.

View file

@ -10,9 +10,12 @@ package main
import ( import (
"os" "os"
"syscall" "syscall"
"github.com/lbryio/lbry.go/v3/extras/stop"
) )
// initsignals sets the signals to be caught by the signal handler // initsignals sets the signals to be caught by the signal handler
func initsignals() { func initsignals(stopCh stop.Chan) {
shutdownRequestChannel = stopCh
interruptSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} interruptSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
} }