Server endpoints goroutine refactor #69
21 changed files with 546 additions and 307 deletions
76
db/db.go
76
db/db.go
|
@ -8,7 +8,6 @@ import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/herald.go/db/prefixes"
|
"github.com/lbryio/herald.go/db/prefixes"
|
||||||
|
@ -16,6 +15,7 @@ import (
|
||||||
"github.com/lbryio/herald.go/internal"
|
"github.com/lbryio/herald.go/internal"
|
||||||
"github.com/lbryio/herald.go/internal/metrics"
|
"github.com/lbryio/herald.go/internal/metrics"
|
||||||
pb "github.com/lbryio/herald.go/protobuf/go"
|
pb "github.com/lbryio/herald.go/protobuf/go"
|
||||||
|
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||||
"github.com/linxGnu/grocksdb"
|
"github.com/linxGnu/grocksdb"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
@ -59,11 +59,7 @@ type ReadOnlyDBColumnFamily struct {
|
||||||
BlockedChannels map[string][]byte
|
BlockedChannels map[string][]byte
|
||||||
FilteredStreams map[string][]byte
|
FilteredStreams map[string][]byte
|
||||||
FilteredChannels map[string][]byte
|
FilteredChannels map[string][]byte
|
||||||
|
|||||||
OpenIterators map[string][]chan struct{}
|
Grp *stop.Group
|
||||||
ItMut sync.RWMutex
|
|
||||||
ShutdownChan chan struct{}
|
|
||||||
DoneChan chan struct{}
|
|
||||||
ShutdownCalled bool
|
|
||||||
Cleanup func()
|
Cleanup func()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -339,19 +335,10 @@ func interruptRequested(interrupted <-chan struct{}) bool {
|
||||||
func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
|
func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
|
||||||
ch := make(chan *prefixes.PrefixRowKV)
|
ch := make(chan *prefixes.PrefixRowKV)
|
||||||
|
|
||||||
iterKey := fmt.Sprintf("%p", opts)
|
// Check if we've been told to shutdown in between getting created and getting here
|
||||||
if opts.DB != nil {
|
if opts.Grp != nil && interruptRequested(opts.Grp.Ch()) {
|
||||||
opts.DB.ItMut.Lock()
|
opts.Grp.Done()
|
||||||
This simplification eliminating `if opts.Grp != nil && interruptRequested(opts.Grp.Ch()) {`
This simplification eliminating `ShutdownCalled` is possible (I think) because you register the iterator with the stop group earlier. Now the registration happens in `opts.WithDB()`
|
|||||||
// There is a tiny chance that we were wating on the above lock while shutdown was
|
return ch
|
||||||
// being called and by the time we get it the db has already notified all active
|
|
||||||
// iterators to shutdown. In this case we go to the else branch.
|
|
||||||
if !opts.DB.ShutdownCalled {
|
|
||||||
opts.DB.OpenIterators[iterKey] = []chan struct{}{opts.DoneChan, opts.ShutdownChan}
|
|
||||||
opts.DB.ItMut.Unlock()
|
|
||||||
} else {
|
|
||||||
opts.DB.ItMut.Unlock()
|
|
||||||
return ch
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ro := grocksdb.NewDefaultReadOptions()
|
ro := grocksdb.NewDefaultReadOptions()
|
||||||
|
@ -369,11 +356,9 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
|
||||||
it.Close()
|
it.Close()
|
||||||
close(ch)
|
close(ch)
|
||||||
ro.Destroy()
|
ro.Destroy()
|
||||||
if opts.DB != nil {
|
if opts.Grp != nil {
|
||||||
opts.DoneChan <- struct{}{}
|
// opts.Grp.DoneNamed(iterKey)
|
||||||
opts.DB.ItMut.Lock()
|
opts.Grp.Done()
|
||||||
delete(opts.DB.OpenIterators, iterKey)
|
|
||||||
opts.DB.ItMut.Unlock()
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -394,7 +379,7 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
|
||||||
if kv = opts.ReadRow(&prevKey); kv != nil {
|
if kv = opts.ReadRow(&prevKey); kv != nil {
|
||||||
ch <- kv
|
ch <- kv
|
||||||
}
|
}
|
||||||
if interruptRequested(opts.ShutdownChan) {
|
if opts.Grp != nil && interruptRequested(opts.Grp.Ch()) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -546,7 +531,7 @@ func GetWriteDBCF(name string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHandle, er
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetProdDB returns a db that is used for production.
|
// GetProdDB returns a db that is used for production.
|
||||||
func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func(), error) {
|
func GetProdDB(name string, secondaryPath string, grp *stop.Group) (*ReadOnlyDBColumnFamily, error) {
|
||||||
prefixNames := prefixes.GetPrefixes()
|
prefixNames := prefixes.GetPrefixes()
|
||||||
// additional prefixes that aren't in the code explicitly
|
// additional prefixes that aren't in the code explicitly
|
||||||
cfNames := []string{"default", "e", "d", "c"}
|
cfNames := []string{"default", "e", "d", "c"}
|
||||||
|
@ -555,7 +540,7 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func
|
||||||
cfNames = append(cfNames, cfName)
|
cfNames = append(cfNames, cfName)
|
||||||
}
|
}
|
||||||
|
|
||||||
db, err := GetDBColumnFamilies(name, secondaryPath, cfNames)
|
db, err := GetDBColumnFamilies(name, secondaryPath, cfNames, grp)
|
||||||
|
|
||||||
cleanupFiles := func() {
|
cleanupFiles := func() {
|
||||||
err = os.RemoveAll(secondaryPath)
|
err = os.RemoveAll(secondaryPath)
|
||||||
|
@ -565,7 +550,8 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, cleanupFiles, err
|
cleanupFiles()
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanupDB := func() {
|
cleanupDB := func() {
|
||||||
|
@ -574,11 +560,11 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func
|
||||||
}
|
}
|
||||||
db.Cleanup = cleanupDB
|
db.Cleanup = cleanupDB
|
||||||
|
|
||||||
return db, cleanupDB, nil
|
return db, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetDBColumnFamilies gets a db with the specified column families and secondary path.
|
// GetDBColumnFamilies gets a db with the specified column families and secondary path.
|
||||||
func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*ReadOnlyDBColumnFamily, error) {
|
func GetDBColumnFamilies(name string, secondayPath string, cfNames []string, grp *stop.Group) (*ReadOnlyDBColumnFamily, error) {
|
||||||
opts := grocksdb.NewDefaultOptions()
|
opts := grocksdb.NewDefaultOptions()
|
||||||
roOpts := grocksdb.NewDefaultReadOptions()
|
roOpts := grocksdb.NewDefaultReadOptions()
|
||||||
cfOpt := grocksdb.NewDefaultOptions()
|
cfOpt := grocksdb.NewDefaultOptions()
|
||||||
|
@ -614,11 +600,7 @@ func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*R
|
||||||
LastState: nil,
|
LastState: nil,
|
||||||
Height: 0,
|
Height: 0,
|
||||||
Headers: nil,
|
Headers: nil,
|
||||||
OpenIterators: make(map[string][]chan struct{}),
|
Grp: grp,
|
||||||
ItMut: sync.RWMutex{},
|
|
||||||
ShutdownChan: make(chan struct{}, 1),
|
|
||||||
ShutdownCalled: false,
|
|
||||||
DoneChan: make(chan struct{}, 1),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err = myDB.ReadDBState() //TODO: Figure out right place for this
|
err = myDB.ReadDBState() //TODO: Figure out right place for this
|
||||||
|
@ -687,24 +669,7 @@ func (db *ReadOnlyDBColumnFamily) Unwind() {
|
||||||
|
|
||||||
// Shutdown shuts down the db.
|
// Shutdown shuts down the db.
|
||||||
func (db *ReadOnlyDBColumnFamily) Shutdown() {
|
func (db *ReadOnlyDBColumnFamily) Shutdown() {
|
||||||
log.Println("Sending message to ShutdownChan...")
|
db.Grp.StopAndWait()
|
||||||
db.ShutdownChan <- struct{}{}
|
|
||||||
log.Println("Locking iterator mutex...")
|
|
||||||
db.ItMut.Lock()
|
|
||||||
log.Println("Setting ShutdownCalled to true...")
|
|
||||||
db.ShutdownCalled = true
|
|
||||||
log.Println("Notifying iterators to shutdown...")
|
|
||||||
for _, it := range db.OpenIterators {
|
|
||||||
it[1] <- struct{}{}
|
|
||||||
}
|
|
||||||
log.Println("Waiting for iterators to shutdown...")
|
|
||||||
for _, it := range db.OpenIterators {
|
|
||||||
<-it[0]
|
|
||||||
}
|
|
||||||
log.Println("Unlocking iterator mutex...")
|
|
||||||
db.ItMut.Unlock()
|
|
||||||
log.Println("Sending message to DoneChan...")
|
|
||||||
<-db.DoneChan
|
|
||||||
log.Println("Calling cleanup...")
|
log.Println("Calling cleanup...")
|
||||||
db.Cleanup()
|
db.Cleanup()
|
||||||
log.Println("Leaving Shutdown...")
|
log.Println("Leaving Shutdown...")
|
||||||
|
@ -714,6 +679,7 @@ func (db *ReadOnlyDBColumnFamily) Shutdown() {
|
||||||
// to keep the db readonly view up to date and handle reorgs on the
|
// to keep the db readonly view up to date and handle reorgs on the
|
||||||
// blockchain.
|
// blockchain.
|
||||||
func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) {
|
func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) {
|
||||||
|
db.Grp.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
lastPrint := time.Now()
|
lastPrint := time.Now()
|
||||||
for {
|
for {
|
||||||
|
@ -727,8 +693,8 @@ func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) {
|
||||||
log.Infof("Error detecting changes: %#v", err)
|
log.Infof("Error detecting changes: %#v", err)
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case <-db.ShutdownChan:
|
case <-db.Grp.Ch():
|
||||||
db.DoneChan <- struct{}{}
|
db.Grp.Done()
|
||||||
return
|
return
|
||||||
case <-time.After(time.Millisecond * 10):
|
case <-time.After(time.Millisecond * 10):
|
||||||
}
|
}
|
||||||
|
|
119
db/db_test.go
119
db/db_test.go
|
@ -7,12 +7,12 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
dbpkg "github.com/lbryio/herald.go/db"
|
dbpkg "github.com/lbryio/herald.go/db"
|
||||||
"github.com/lbryio/herald.go/db/prefixes"
|
"github.com/lbryio/herald.go/db/prefixes"
|
||||||
"github.com/lbryio/herald.go/internal"
|
"github.com/lbryio/herald.go/internal"
|
||||||
|
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||||
"github.com/linxGnu/grocksdb"
|
"github.com/linxGnu/grocksdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -21,7 +21,7 @@ import (
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
// OpenAndFillTmpDBColumnFamlies opens a db and fills it with data from a csv file using the given column family names
|
// OpenAndFillTmpDBColumnFamlies opens a db and fills it with data from a csv file using the given column family names
|
||||||
func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFamily, [][]string, func(), error) {
|
func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFamily, [][]string, error) {
|
||||||
|
|
||||||
log.Println(filePath)
|
log.Println(filePath)
|
||||||
file, err := os.Open(filePath)
|
file, err := os.Open(filePath)
|
||||||
|
@ -31,7 +31,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
|
||||||
reader := csv.NewReader(file)
|
reader := csv.NewReader(file)
|
||||||
records, err := reader.ReadAll()
|
records, err := reader.ReadAll()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
wOpts := grocksdb.NewDefaultWriteOptions()
|
wOpts := grocksdb.NewDefaultWriteOptions()
|
||||||
|
@ -39,7 +39,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
|
||||||
opts.SetCreateIfMissing(true)
|
opts.SetCreateIfMissing(true)
|
||||||
db, err := grocksdb.OpenDb(opts, "tmp")
|
db, err := grocksdb.OpenDb(opts, "tmp")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
var handleMap map[string]*grocksdb.ColumnFamilyHandle = make(map[string]*grocksdb.ColumnFamilyHandle)
|
var handleMap map[string]*grocksdb.ColumnFamilyHandle = make(map[string]*grocksdb.ColumnFamilyHandle)
|
||||||
|
|
||||||
|
@ -54,7 +54,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
|
||||||
log.Println(cfName)
|
log.Println(cfName)
|
||||||
handle, err := db.CreateColumnFamily(opts, cfName)
|
handle, err := db.CreateColumnFamily(opts, cfName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
handleMap[cfName] = handle
|
handleMap[cfName] = handle
|
||||||
}
|
}
|
||||||
|
@ -68,16 +68,16 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
|
||||||
for _, record := range records[1:] {
|
for _, record := range records[1:] {
|
||||||
cf := record[0]
|
cf := record[0]
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
handle := handleMap[string(cf)]
|
handle := handleMap[string(cf)]
|
||||||
key, err := hex.DecodeString(record[1])
|
key, err := hex.DecodeString(record[1])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
val, err := hex.DecodeString(record[2])
|
val, err := hex.DecodeString(record[2])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
db.PutCF(wOpts, handle, key, val)
|
db.PutCF(wOpts, handle, key, val)
|
||||||
}
|
}
|
||||||
|
@ -94,11 +94,8 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
|
||||||
LastState: nil,
|
LastState: nil,
|
||||||
Height: 0,
|
Height: 0,
|
||||||
Headers: nil,
|
Headers: nil,
|
||||||
OpenIterators: make(map[string][]chan struct{}),
|
Grp: stop.New(),
|
||||||
ItMut: sync.RWMutex{},
|
Cleanup: toDefer,
|
||||||
ShutdownChan: make(chan struct{}, 1),
|
|
||||||
DoneChan: make(chan struct{}, 1),
|
|
||||||
ShutdownCalled: false,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// err = dbpkg.ReadDBState(myDB) //TODO: Figure out right place for this
|
// err = dbpkg.ReadDBState(myDB) //TODO: Figure out right place for this
|
||||||
|
@ -108,7 +105,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
|
||||||
|
|
||||||
err = myDB.InitTxCounts()
|
err = myDB.InitTxCounts()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// err = dbpkg.InitHeaders(myDB)
|
// err = dbpkg.InitHeaders(myDB)
|
||||||
|
@ -116,7 +113,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
|
||||||
// return nil, nil, nil, err
|
// return nil, nil, nil, err
|
||||||
// }
|
// }
|
||||||
|
|
||||||
return myDB, records, toDefer, nil
|
return myDB, records, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenAndFillTmpDBCF opens a db and fills it with data from a csv file
|
// OpenAndFillTmpDBCF opens a db and fills it with data from a csv file
|
||||||
|
@ -247,17 +244,18 @@ func CatCSV(filePath string) {
|
||||||
|
|
||||||
func TestCatFullDB(t *testing.T) {
|
func TestCatFullDB(t *testing.T) {
|
||||||
t.Skip("Skipping full db test")
|
t.Skip("Skipping full db test")
|
||||||
|
grp := stop.New()
|
||||||
// url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c"
|
// url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c"
|
||||||
// "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1"
|
// "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1"
|
||||||
// url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
|
// url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
|
||||||
// url := "lbry://@lbry"
|
// url := "lbry://@lbry"
|
||||||
// url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a"
|
// url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a"
|
||||||
dbPath := "/mnt/sda/wallet_server/_data/lbry-rocksdb/"
|
dbPath := "/mnt/sda1/wallet_server/_data/lbry-rocksdb/"
|
||||||
// dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
|
// dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
|
||||||
secondaryPath := "asdf"
|
secondaryPath := "asdf"
|
||||||
db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath)
|
db, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp)
|
||||||
|
defer db.Shutdown()
|
||||||
|
|
||||||
defer toDefer()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
|
@ -277,6 +275,7 @@ func TestCatFullDB(t *testing.T) {
|
||||||
// TestOpenFullDB Tests running a resolve on a full db.
|
// TestOpenFullDB Tests running a resolve on a full db.
|
||||||
func TestOpenFullDB(t *testing.T) {
|
func TestOpenFullDB(t *testing.T) {
|
||||||
t.Skip("Skipping full db test")
|
t.Skip("Skipping full db test")
|
||||||
|
grp := stop.New()
|
||||||
// url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c"
|
// url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c"
|
||||||
// "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1"
|
// "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1"
|
||||||
// url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
|
// url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
|
||||||
|
@ -284,11 +283,11 @@ func TestOpenFullDB(t *testing.T) {
|
||||||
// url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a"
|
// url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a"
|
||||||
// url := "lbry://@lbry$1"
|
// url := "lbry://@lbry$1"
|
||||||
url := "https://lbry.tv/@lothrop:2/lothrop-livestream-games-and-code:c"
|
url := "https://lbry.tv/@lothrop:2/lothrop-livestream-games-and-code:c"
|
||||||
dbPath := "/mnt/sda/wallet_server/_data/lbry-rocksdb/"
|
dbPath := "/mnt/sda1/wallet_server/_data/lbry-rocksdb/"
|
||||||
// dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
|
// dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
|
||||||
secondaryPath := "asdf"
|
secondaryPath := "asdf"
|
||||||
db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath)
|
db, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp)
|
||||||
defer toDefer()
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
|
@ -302,12 +301,13 @@ func TestOpenFullDB(t *testing.T) {
|
||||||
func TestResolve(t *testing.T) {
|
func TestResolve(t *testing.T) {
|
||||||
url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
|
url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
|
||||||
filePath := "../testdata/FULL_resolve.csv"
|
filePath := "../testdata/FULL_resolve.csv"
|
||||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||||
Have ...Ditto all the other places calling Have `OpenAndFillTmpDBColumnFamilies()` assign `db.Cleanup = toDefer` inside to simplify this.
...Ditto all the other places calling `OpenAndFillTmpDBColumnFamilies()`.
|
|||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer toDefer()
|
|
||||||
expandedResolveResult := db.Resolve(url)
|
expandedResolveResult := db.Resolve(url)
|
||||||
log.Printf("%#v\n", expandedResolveResult)
|
log.Printf("%#v\n", expandedResolveResult)
|
||||||
if expandedResolveResult != nil && expandedResolveResult.Channel != nil {
|
if expandedResolveResult != nil && expandedResolveResult.Channel != nil {
|
||||||
|
@ -321,11 +321,11 @@ func TestResolve(t *testing.T) {
|
||||||
func TestGetDBState(t *testing.T) {
|
func TestGetDBState(t *testing.T) {
|
||||||
filePath := "../testdata/s_resolve.csv"
|
filePath := "../testdata/s_resolve.csv"
|
||||||
want := uint32(1072108)
|
want := uint32(1072108)
|
||||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
defer toDefer()
|
|
||||||
state, err := db.GetDBState()
|
state, err := db.GetDBState()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
|
@ -343,11 +343,11 @@ func TestGetRepostedClaim(t *testing.T) {
|
||||||
// Should be non-existent
|
// Should be non-existent
|
||||||
channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf")
|
channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf")
|
||||||
filePath := "../testdata/W_resolve.csv"
|
filePath := "../testdata/W_resolve.csv"
|
||||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
defer toDefer()
|
|
||||||
|
|
||||||
count, err := db.GetRepostedCount(channelHash)
|
count, err := db.GetRepostedCount(channelHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -376,11 +376,11 @@ func TestGetRepostedCount(t *testing.T) {
|
||||||
// Should be non-existent
|
// Should be non-existent
|
||||||
channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf")
|
channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf")
|
||||||
filePath := "../testdata/j_resolve.csv"
|
filePath := "../testdata/j_resolve.csv"
|
||||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
defer toDefer()
|
|
||||||
|
|
||||||
count, err := db.GetRepostedCount(channelHash)
|
count, err := db.GetRepostedCount(channelHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -413,11 +413,11 @@ func TestGetRepost(t *testing.T) {
|
||||||
channelHash2, _ := hex.DecodeString("000009ca6e0caaaef16872b4bd4f6f1b8c2363e2")
|
channelHash2, _ := hex.DecodeString("000009ca6e0caaaef16872b4bd4f6f1b8c2363e2")
|
||||||
filePath := "../testdata/V_resolve.csv"
|
filePath := "../testdata/V_resolve.csv"
|
||||||
// want := uint32(3670)
|
// want := uint32(3670)
|
||||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
defer toDefer()
|
|
||||||
|
|
||||||
res, err := db.GetRepost(channelHash)
|
res, err := db.GetRepost(channelHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -447,11 +447,12 @@ func TestGetClaimsInChannelCount(t *testing.T) {
|
||||||
channelHash, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bd")
|
channelHash, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bd")
|
||||||
filePath := "../testdata/Z_resolve.csv"
|
filePath := "../testdata/Z_resolve.csv"
|
||||||
want := uint32(3670)
|
want := uint32(3670)
|
||||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
defer toDefer()
|
|
||||||
count, err := db.GetClaimsInChannelCount(channelHash)
|
count, err := db.GetClaimsInChannelCount(channelHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
|
@ -476,11 +477,12 @@ func TestGetShortClaimIdUrl(t *testing.T) {
|
||||||
var position uint16 = 0
|
var position uint16 = 0
|
||||||
filePath := "../testdata/F_resolve.csv"
|
filePath := "../testdata/F_resolve.csv"
|
||||||
log.Println(filePath)
|
log.Println(filePath)
|
||||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
defer toDefer()
|
|
||||||
shortUrl, err := db.GetShortClaimIdUrl(name, normalName, claimHash, rootTxNum, position)
|
shortUrl, err := db.GetShortClaimIdUrl(name, normalName, claimHash, rootTxNum, position)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
|
@ -494,11 +496,11 @@ func TestClaimShortIdIter(t *testing.T) {
|
||||||
filePath := "../testdata/F_cat.csv"
|
filePath := "../testdata/F_cat.csv"
|
||||||
normalName := "cat"
|
normalName := "cat"
|
||||||
claimId := "0"
|
claimId := "0"
|
||||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
defer toDefer()
|
|
||||||
|
|
||||||
ch := db.ClaimShortIdIter(normalName, claimId)
|
ch := db.ClaimShortIdIter(normalName, claimId)
|
||||||
|
|
||||||
|
@ -524,11 +526,12 @@ func TestGetTXOToClaim(t *testing.T) {
|
||||||
var txNum uint32 = 1456296
|
var txNum uint32 = 1456296
|
||||||
var position uint16 = 0
|
var position uint16 = 0
|
||||||
filePath := "../testdata/G_2.csv"
|
filePath := "../testdata/G_2.csv"
|
||||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
defer toDefer()
|
|
||||||
val, err := db.GetCachedClaimHash(txNum, position)
|
val, err := db.GetCachedClaimHash(txNum, position)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
|
@ -552,11 +555,11 @@ func TestGetClaimToChannel(t *testing.T) {
|
||||||
var val []byte = nil
|
var val []byte = nil
|
||||||
|
|
||||||
filePath := "../testdata/I_resolve.csv"
|
filePath := "../testdata/I_resolve.csv"
|
||||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
defer toDefer()
|
|
||||||
|
|
||||||
val, err = db.GetChannelForClaim(claimHash, txNum, position)
|
val, err = db.GetChannelForClaim(claimHash, txNum, position)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -581,11 +584,11 @@ func TestGetEffectiveAmountSupportOnly(t *testing.T) {
|
||||||
want := uint64(20000006)
|
want := uint64(20000006)
|
||||||
claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a"
|
claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a"
|
||||||
claimHash, _ := hex.DecodeString(claimHashStr)
|
claimHash, _ := hex.DecodeString(claimHashStr)
|
||||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
defer toDefer()
|
|
||||||
db.Height = 999999999
|
db.Height = 999999999
|
||||||
|
|
||||||
amount, err := db.GetEffectiveAmount(claimHash, true)
|
amount, err := db.GetEffectiveAmount(claimHash, true)
|
||||||
|
@ -611,11 +614,12 @@ func TestGetEffectiveAmount(t *testing.T) {
|
||||||
want := uint64(21000006)
|
want := uint64(21000006)
|
||||||
claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a"
|
claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a"
|
||||||
claimHash, _ := hex.DecodeString(claimHashStr)
|
claimHash, _ := hex.DecodeString(claimHashStr)
|
||||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
defer toDefer()
|
|
||||||
db.Height = 999999999
|
db.Height = 999999999
|
||||||
|
|
||||||
amount, err := db.GetEffectiveAmount(claimHash, false)
|
amount, err := db.GetEffectiveAmount(claimHash, false)
|
||||||
|
@ -648,11 +652,12 @@ func TestGetSupportAmount(t *testing.T) {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
filePath := "../testdata/a_resolve.csv"
|
filePath := "../testdata/a_resolve.csv"
|
||||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
defer toDefer()
|
|
||||||
res, err := db.GetSupportAmount(claimHash)
|
res, err := db.GetSupportAmount(claimHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
|
@ -668,11 +673,12 @@ func TestGetTxHash(t *testing.T) {
|
||||||
want := "54e14ff0c404c29b3d39ae4d249435f167d5cd4ce5a428ecb745b3df1c8e3dde"
|
want := "54e14ff0c404c29b3d39ae4d249435f167d5cd4ce5a428ecb745b3df1c8e3dde"
|
||||||
|
|
||||||
filePath := "../testdata/X_resolve.csv"
|
filePath := "../testdata/X_resolve.csv"
|
||||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
defer toDefer()
|
|
||||||
resHash, err := db.GetTxHash(txNum)
|
resHash, err := db.GetTxHash(txNum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
|
@ -710,11 +716,12 @@ func TestGetActivation(t *testing.T) {
|
||||||
txNum := uint32(0x6284e3)
|
txNum := uint32(0x6284e3)
|
||||||
position := uint16(0x0)
|
position := uint16(0x0)
|
||||||
want := uint32(0xa6b65)
|
want := uint32(0xa6b65)
|
||||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
defer toDefer()
|
|
||||||
activation, err := db.GetActivation(txNum, position)
|
activation, err := db.GetActivation(txNum, position)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
|
@ -741,12 +748,13 @@ func TestGetClaimToTXO(t *testing.T) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
filePath := "../testdata/E_resolve.csv"
|
filePath := "../testdata/E_resolve.csv"
|
||||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer toDefer()
|
|
||||||
res, err := db.GetCachedClaimTxo(claimHash, true)
|
res, err := db.GetCachedClaimTxo(claimHash, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
|
@ -770,12 +778,13 @@ func TestGetControllingClaim(t *testing.T) {
|
||||||
claimName := internal.NormalizeName("@Styxhexenhammer666")
|
claimName := internal.NormalizeName("@Styxhexenhammer666")
|
||||||
claimHash := "2556ed1cab9d17f2a9392030a9ad7f5d138f11bd"
|
claimHash := "2556ed1cab9d17f2a9392030a9ad7f5d138f11bd"
|
||||||
filePath := "../testdata/P_resolve.csv"
|
filePath := "../testdata/P_resolve.csv"
|
||||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer toDefer()
|
|
||||||
res, err := db.GetControllingClaim(claimName)
|
res, err := db.GetControllingClaim(claimName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
|
||||||
"github.com/lbryio/herald.go/db/prefixes"
|
"github.com/lbryio/herald.go/db/prefixes"
|
||||||
|
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||||
"github.com/linxGnu/grocksdb"
|
"github.com/linxGnu/grocksdb"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
@ -22,12 +23,11 @@ type IterOptions struct {
|
||||||
IncludeValue bool
|
IncludeValue bool
|
||||||
RawKey bool
|
RawKey bool
|
||||||
RawValue bool
|
RawValue bool
|
||||||
ShutdownChan chan struct{}
|
Grp *stop.Group
|
||||||
DoneChan chan struct{}
|
// DB *ReadOnlyDBColumnFamily
|
||||||
DB *ReadOnlyDBColumnFamily
|
CfHandle *grocksdb.ColumnFamilyHandle
|
||||||
CfHandle *grocksdb.ColumnFamilyHandle
|
It *grocksdb.Iterator
|
||||||
It *grocksdb.Iterator
|
Serializer *prefixes.SerializationAPI
|
||||||
Serializer *prefixes.SerializationAPI
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewIterateOptions creates a defualt options structure for a db iterator.
|
// NewIterateOptions creates a defualt options structure for a db iterator.
|
||||||
|
@ -43,12 +43,11 @@ func NewIterateOptions() *IterOptions {
|
||||||
IncludeValue: false,
|
IncludeValue: false,
|
||||||
RawKey: false,
|
RawKey: false,
|
||||||
RawValue: false,
|
RawValue: false,
|
||||||
ShutdownChan: make(chan struct{}, 1),
|
Grp: nil,
|
||||||
DoneChan: make(chan struct{}, 1),
|
// DB: nil,
|
||||||
DB: nil,
|
CfHandle: nil,
|
||||||
CfHandle: nil,
|
It: nil,
|
||||||
It: nil,
|
Serializer: prefixes.ProductionAPI,
|
||||||
Serializer: prefixes.ProductionAPI,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,7 +107,9 @@ func (o *IterOptions) WithRawValue(rawValue bool) *IterOptions {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *IterOptions) WithDB(db *ReadOnlyDBColumnFamily) *IterOptions {
|
func (o *IterOptions) WithDB(db *ReadOnlyDBColumnFamily) *IterOptions {
|
||||||
o.DB = db
|
// o.Grp.AddNamed(1, iterKey)
|
||||||
|
o.Grp = stop.New(db.Grp)
|
||||||
|
o.Grp.Add(1)
|
||||||
return o
|
return o
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
5
go.mod
5
go.mod
|
@ -24,9 +24,6 @@ require (
|
||||||
gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c
|
gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
|
||||||
)
|
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/beorn7/perks v1.0.1 // indirect
|
github.com/beorn7/perks v1.0.1 // indirect
|
||||||
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
|
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
|
||||||
|
@ -43,7 +40,7 @@ require (
|
||||||
github.com/prometheus/procfs v0.6.0 // indirect
|
github.com/prometheus/procfs v0.6.0 // indirect
|
||||||
github.com/stretchr/testify v1.7.0 // indirect
|
github.com/stretchr/testify v1.7.0 // indirect
|
||||||
golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b // indirect
|
golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b // indirect
|
||||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
|
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2
|
||||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
|
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
|
||||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
|
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
|
||||||
google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84 // indirect
|
google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84 // indirect
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -358,8 +358,6 @@ github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL
|
||||||
github.com/lbryio/lbcd v0.22.100-beta/go.mod h1:u8SaFX4xdGMMR5xasBGfgApC8pvD4rnK2OujZnrq5gs=
|
github.com/lbryio/lbcd v0.22.100-beta/go.mod h1:u8SaFX4xdGMMR5xasBGfgApC8pvD4rnK2OujZnrq5gs=
|
||||||
github.com/lbryio/lbcd v0.22.100-beta-rc5/go.mod h1:9PbFSlHYX7WlnDQwcTxHVf1W35VAnRsattCSyKOO55g=
|
github.com/lbryio/lbcd v0.22.100-beta-rc5/go.mod h1:9PbFSlHYX7WlnDQwcTxHVf1W35VAnRsattCSyKOO55g=
|
||||||
github.com/lbryio/lbcd v0.22.200-beta/go.mod h1:kNuzGWf808ipTGB0y0WogzsGv5BVM4Qv85Z+JYwC9FA=
|
github.com/lbryio/lbcd v0.22.200-beta/go.mod h1:kNuzGWf808ipTGB0y0WogzsGv5BVM4Qv85Z+JYwC9FA=
|
||||||
github.com/lbryio/lbcd v0.22.201-beta-rc1 h1:FmzzApVj2RBXloLM2w9tLvN2xyTZjeyh+QC7GIw/wwo=
|
|
||||||
github.com/lbryio/lbcd v0.22.201-beta-rc1/go.mod h1:kNuzGWf808ipTGB0y0WogzsGv5BVM4Qv85Z+JYwC9FA=
|
|
||||||
github.com/lbryio/lbcd v0.22.201-beta-rc4 h1:Xh751Bh/GWRcP5bI6NJ2+zueo2otTcTWapFvFbryP5c=
|
github.com/lbryio/lbcd v0.22.201-beta-rc4 h1:Xh751Bh/GWRcP5bI6NJ2+zueo2otTcTWapFvFbryP5c=
|
||||||
github.com/lbryio/lbcd v0.22.201-beta-rc4/go.mod h1:Jgo48JDINhdOgHHR83J70Q6G42x3WAo9DI//QogcL+E=
|
github.com/lbryio/lbcd v0.22.201-beta-rc4/go.mod h1:Jgo48JDINhdOgHHR83J70Q6G42x3WAo9DI//QogcL+E=
|
||||||
github.com/lbryio/lbcutil v1.0.201/go.mod h1:gDHc/b+Rdz3J7+VB8e5/Bl9roVf8Q5/8FQCyuK9dXD0=
|
github.com/lbryio/lbcutil v1.0.201/go.mod h1:gDHc/b+Rdz3J7+VB8e5/Bl9roVf8Q5/8FQCyuK9dXD0=
|
||||||
|
@ -375,8 +373,6 @@ github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-b
|
||||||
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
|
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
|
||||||
github.com/linxGnu/grocksdb v1.6.42 h1:nJLoXFuzwBwQQQrXTUgRGRz1QRm7y8pR6CNV/gwrbqs=
|
github.com/linxGnu/grocksdb v1.6.42 h1:nJLoXFuzwBwQQQrXTUgRGRz1QRm7y8pR6CNV/gwrbqs=
|
||||||
github.com/linxGnu/grocksdb v1.6.42/go.mod h1:JcMMDBFaDNhRXFYcYXmgQwb/RarSld1PulTI7UzE+w0=
|
github.com/linxGnu/grocksdb v1.6.42/go.mod h1:JcMMDBFaDNhRXFYcYXmgQwb/RarSld1PulTI7UzE+w0=
|
||||||
github.com/linxGnu/grocksdb v1.7.0 h1:UyFDykX0CUfxDN10cqlFho/rwt9K6KoDaLXL9Ej5z9g=
|
|
||||||
github.com/linxGnu/grocksdb v1.7.0/go.mod h1:JcMMDBFaDNhRXFYcYXmgQwb/RarSld1PulTI7UzE+w0=
|
|
||||||
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
|
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
|
||||||
github.com/lyoshenka/bencode v0.0.0-20180323155644-b7abd7672df5/go.mod h1:H0aPCWffGOaDcjkw1iB7W9DVLp6GXmfcJY/7YZCWPA4=
|
github.com/lyoshenka/bencode v0.0.0-20180323155644-b7abd7672df5/go.mod h1:H0aPCWffGOaDcjkw1iB7W9DVLp6GXmfcJY/7YZCWPA4=
|
||||||
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
|
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
|
||||||
|
|
12
main.go
12
main.go
|
@ -10,6 +10,7 @@ import (
|
||||||
"github.com/lbryio/herald.go/internal"
|
"github.com/lbryio/herald.go/internal"
|
||||||
pb "github.com/lbryio/herald.go/protobuf/go"
|
pb "github.com/lbryio/herald.go/protobuf/go"
|
||||||
"github.com/lbryio/herald.go/server"
|
"github.com/lbryio/herald.go/server"
|
||||||
|
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
@ -27,13 +28,16 @@ func main() {
|
||||||
|
|
||||||
if args.CmdType == server.ServeCmd {
|
if args.CmdType == server.ServeCmd {
|
||||||
// This will cancel goroutines with the server finishes.
|
// This will cancel goroutines with the server finishes.
|
||||||
ctxWCancel, cancel := context.WithCancel(ctx)
|
// ctxWCancel, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
// defer cancel()
|
||||||
|
stopGroup := stop.New()
|
||||||
|
// defer stopGroup.Stop()
|
||||||
|
|
||||||
initsignals()
|
initsignals(stopGroup.Ch())
|
||||||
interrupt := interruptListener()
|
interrupt := interruptListener()
|
||||||
|
|
||||||
s := server.MakeHubServer(ctxWCancel, args)
|
// s := server.MakeHubServer(ctxWCancel, args)
|
||||||
|
s := server.MakeHubServer(stopGroup, args)
|
||||||
go s.Run()
|
go s.Run()
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
|
@ -88,7 +88,7 @@ test_command_with_want
|
||||||
|
|
||||||
### blockchain.block.get_chunk
|
### blockchain.block.get_chunk
|
||||||
read -r -d '' CMD <<- EOM
|
read -r -d '' CMD <<- EOM
|
||||||
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
|
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||||
--data '{"id": 1, "method": "blockchain.block.get_chunk", "params": [0]}'
|
--data '{"id": 1, "method": "blockchain.block.get_chunk", "params": [0]}'
|
||||||
| jq .result | sed 's/"//g' | head -c 100
|
| jq .result | sed 's/"//g' | head -c 100
|
||||||
EOM
|
EOM
|
||||||
|
@ -97,7 +97,7 @@ test_command_with_want
|
||||||
|
|
||||||
### blockchain.block.get_header
|
### blockchain.block.get_header
|
||||||
read -r -d '' CMD <<- EOM
|
read -r -d '' CMD <<- EOM
|
||||||
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
|
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||||
--data '{"id": 1, "method": "blockchain.block.get_header", "params": []}'
|
--data '{"id": 1, "method": "blockchain.block.get_header", "params": []}'
|
||||||
| jq .result.timestamp
|
| jq .result.timestamp
|
||||||
EOM
|
EOM
|
||||||
|
@ -106,7 +106,7 @@ test_command_with_want
|
||||||
|
|
||||||
### blockchain.block.headers
|
### blockchain.block.headers
|
||||||
read -r -d '' CMD <<- EOM
|
read -r -d '' CMD <<- EOM
|
||||||
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
|
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||||
--data '{"id": 1, "method": "blockchain.block.headers", "params": []}'
|
--data '{"id": 1, "method": "blockchain.block.headers", "params": []}'
|
||||||
| jq .result.count
|
| jq .result.count
|
||||||
EOM
|
EOM
|
||||||
|
@ -116,7 +116,7 @@ test_command_with_want
|
||||||
## blockchain.claimtrie
|
## blockchain.claimtrie
|
||||||
|
|
||||||
read -r -d '' CMD <<- EOM
|
read -r -d '' CMD <<- EOM
|
||||||
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
|
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||||
--data '{"id": 1, "method": "blockchain.claimtrie.resolve", "params":[{"Data": ["@Styxhexenhammer666:2"]}]}'
|
--data '{"id": 1, "method": "blockchain.claimtrie.resolve", "params":[{"Data": ["@Styxhexenhammer666:2"]}]}'
|
||||||
| jq .result.txos[0].tx_hash | sed 's/"//g'
|
| jq .result.txos[0].tx_hash | sed 's/"//g'
|
||||||
EOM
|
EOM
|
||||||
|
@ -128,7 +128,7 @@ test_command_with_want
|
||||||
### blockchain.address.get_balance
|
### blockchain.address.get_balance
|
||||||
|
|
||||||
read -r -d '' CMD <<- EOM
|
read -r -d '' CMD <<- EOM
|
||||||
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
|
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||||
--data '{"id": 1, "method": "blockchain.address.get_balance", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
|
--data '{"id": 1, "method": "blockchain.address.get_balance", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
|
||||||
| jq .result.confirmed
|
| jq .result.confirmed
|
||||||
EOM
|
EOM
|
||||||
|
@ -138,7 +138,7 @@ test_command_with_want
|
||||||
## blockchain.address.get_history
|
## blockchain.address.get_history
|
||||||
|
|
||||||
read -r -d '' CMD <<- EOM
|
read -r -d '' CMD <<- EOM
|
||||||
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
|
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||||
--data '{"id": 1, "method": "blockchain.address.get_history", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
|
--data '{"id": 1, "method": "blockchain.address.get_history", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
|
||||||
| jq '.result.confirmed | length'
|
| jq '.result.confirmed | length'
|
||||||
EOM
|
EOM
|
||||||
|
@ -148,7 +148,7 @@ test_command_with_want
|
||||||
## blockchain.address.listunspent
|
## blockchain.address.listunspent
|
||||||
|
|
||||||
read -r -d '' CMD <<- EOM
|
read -r -d '' CMD <<- EOM
|
||||||
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
|
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||||
--data '{"id": 1, "method": "blockchain.address.listunspent", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
|
--data '{"id": 1, "method": "blockchain.address.listunspent", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
|
||||||
| jq '.result | length'
|
| jq '.result | length'
|
||||||
EOM
|
EOM
|
||||||
|
@ -160,7 +160,7 @@ test_command_with_want
|
||||||
## blockchain.scripthash.get_mempool
|
## blockchain.scripthash.get_mempool
|
||||||
|
|
||||||
read -r -d '' CMD <<- EOM
|
read -r -d '' CMD <<- EOM
|
||||||
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
|
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||||
--data '{"id": 1, "method": "blockchain.scripthash.get_mempool", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
|
--data '{"id": 1, "method": "blockchain.scripthash.get_mempool", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
|
||||||
| jq .error | sed 's/"//g'
|
| jq .error | sed 's/"//g'
|
||||||
EOM
|
EOM
|
||||||
|
@ -170,7 +170,7 @@ test_command_with_want
|
||||||
## blockchain.scripthash.get_history
|
## blockchain.scripthash.get_history
|
||||||
|
|
||||||
read -r -d '' CMD <<- EOM
|
read -r -d '' CMD <<- EOM
|
||||||
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
|
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||||
--data '{"id": 1, "method": "blockchain.scripthash.get_history", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
|
--data '{"id": 1, "method": "blockchain.scripthash.get_history", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
|
||||||
| jq .error | sed 's/"//g'
|
| jq .error | sed 's/"//g'
|
||||||
EOM
|
EOM
|
||||||
|
@ -180,13 +180,42 @@ test_command_with_want
|
||||||
## blockchain.scripthash.listunspent
|
## blockchain.scripthash.listunspent
|
||||||
|
|
||||||
read -r -d '' CMD <<- EOM
|
read -r -d '' CMD <<- EOM
|
||||||
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
|
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||||
--data '{"id": 1, "method": "blockchain.scripthash.listunspent", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
|
--data '{"id": 1, "method": "blockchain.scripthash.listunspent", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
|
||||||
| jq .error | sed 's/"//g'
|
| jq .error | sed 's/"//g'
|
||||||
EOM
|
EOM
|
||||||
WANT="encoding/hex: invalid byte: U+0047 'G'"
|
WANT="encoding/hex: invalid byte: U+0047 'G'"
|
||||||
test_command_with_want
|
test_command_with_want
|
||||||
|
|
||||||
|
## server.banner
|
||||||
|
|
||||||
|
read -r -d '' CMD <<- EOM
|
||||||
|
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||||
|
--data '{"id": 1, "method": "server.banner", "params":[]}'
|
||||||
|
| jq .result | sed 's/"//g'
|
||||||
|
EOM
|
||||||
|
WANT="You are connected to an 0.107.0 server."
|
||||||
|
test_command_with_want
|
||||||
|
|
||||||
|
## server.version
|
||||||
|
|
||||||
|
read -r -d '' CMD <<- EOM
|
||||||
|
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||||
|
--data '{"id": 1, "method": "server.version", "params":[]}'
|
||||||
|
| jq .result | sed 's/"//g'
|
||||||
|
EOM
|
||||||
|
WANT="0.107.0"
|
||||||
|
test_command_with_want
|
||||||
|
|
||||||
|
## server.features
|
||||||
|
|
||||||
|
read -r -d '' CMD <<- EOM
|
||||||
|
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||||
|
--data '{"id": 1, "method": "server.features", "params":[]}'
|
||||||
|
EOM
|
||||||
|
WANT='{"result":{"hosts":{},"pruning":"","server_version":"0.107.0","protocol_min":"0.54.0","protocol_max":"0.199.0","genesis_hash":"9c89283ba0f3227f6c03b70216b9f665f0118d5e0fa729cedf4fb34d6a34f463","description":"Herald","payment_address":"","donation_address":"","daily_fee":"1.0","hash_function":"sha256","trending_algorithm":"fast_ar"},"error":null,"id":1}'
|
||||||
|
test_command_with_want
|
||||||
|
|
||||||
# metrics endpoint testing
|
# metrics endpoint testing
|
||||||
|
|
||||||
WANT=0
|
WANT=0
|
||||||
|
|
229
server/args.go
229
server/args.go
|
@ -1,6 +1,7 @@
|
||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -19,26 +20,37 @@ const (
|
||||||
|
|
||||||
// Args struct contains the arguments to the hub server.
|
// Args struct contains the arguments to the hub server.
|
||||||
type Args struct {
|
type Args struct {
|
||||||
CmdType int
|
CmdType int
|
||||||
Host string
|
Host string
|
||||||
Port string
|
Port string
|
||||||
DBPath string
|
DBPath string
|
||||||
Chain *string
|
Chain *string
|
||||||
EsHost string
|
EsHost string
|
||||||
EsPort string
|
EsPort string
|
||||||
PrometheusPort string
|
PrometheusPort string
|
||||||
NotifierPort string
|
NotifierPort string
|
||||||
JSONRPCPort int
|
JSONRPCPort int
|
||||||
JSONRPCHTTPPort int
|
JSONRPCHTTPPort int
|
||||||
MaxSessions int
|
MaxSessions int
|
||||||
SessionTimeout int
|
SessionTimeout int
|
||||||
EsIndex string
|
EsIndex string
|
||||||
RefreshDelta int
|
RefreshDelta int
|
||||||
CacheTTL int
|
CacheTTL int
|
||||||
PeerFile string
|
PeerFile string
|
||||||
Country string
|
Banner *string
|
||||||
BlockingChannelIds []string
|
Country string
|
||||||
FilteringChannelIds []string
|
BlockingChannelIds []string
|
||||||
|
FilteringChannelIds []string
|
||||||
|
|
||||||
|
GenesisHash string
|
||||||
|
ServerVersion string
|
||||||
|
ProtocolMin string
|
||||||
|
ProtocolMax string
|
||||||
|
ServerDescription string
|
||||||
|
PaymentAddress string
|
||||||
|
DonationAddress string
|
||||||
|
DailyFee string
|
||||||
|
|
||||||
Debug bool
|
Debug bool
|
||||||
DisableEs bool
|
DisableEs bool
|
||||||
DisableLoadPeers bool
|
DisableLoadPeers bool
|
||||||
|
@ -54,21 +66,32 @@ type Args struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
DefaultHost = "0.0.0.0"
|
DefaultHost = "0.0.0.0"
|
||||||
DefaultPort = "50051"
|
DefaultPort = "50051"
|
||||||
DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME
|
DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME
|
||||||
DefaultEsHost = "http://localhost"
|
DefaultEsHost = "http://localhost"
|
||||||
DefaultEsIndex = "claims"
|
DefaultEsIndex = "claims"
|
||||||
DefaultEsPort = "9200"
|
DefaultEsPort = "9200"
|
||||||
DefaultPrometheusPort = "2112"
|
DefaultPrometheusPort = "2112"
|
||||||
DefaultNotifierPort = "18080"
|
DefaultNotifierPort = "18080"
|
||||||
DefaultJSONRPCPort = 50001
|
DefaultJSONRPCPort = 50001
|
||||||
DefaultMaxSessions = 10000
|
DefaultJSONRPCHTTPPort = 50002
|
||||||
DefaultSessionTimeout = 300
|
DefaultMaxSessions = 10000
|
||||||
DefaultRefreshDelta = 5
|
DefaultSessionTimeout = 300
|
||||||
DefaultCacheTTL = 5
|
DefaultRefreshDelta = 5
|
||||||
DefaultPeerFile = "peers.txt"
|
DefaultCacheTTL = 5
|
||||||
DefaultCountry = "US"
|
DefaultPeerFile = "peers.txt"
|
||||||
|
DefaultBannerFile = ""
|
||||||
|
DefaultCountry = "US"
|
||||||
|
|
||||||
|
HUB_PROTOCOL_VERSION = "0.107.0"
|
||||||
|
PROTOCOL_MIN = "0.54.0"
|
||||||
|
PROTOCOL_MAX = "0.199.0"
|
||||||
|
DefaultServerDescription = "Herald"
|
||||||
|
DefaultPaymentAddress = ""
|
||||||
|
DefaultDonationAddress = ""
|
||||||
|
DefaultDailyFee = "1.0"
|
||||||
|
|
||||||
DefaultDisableLoadPeers = false
|
DefaultDisableLoadPeers = false
|
||||||
DefaultDisableStartPrometheus = false
|
DefaultDisableStartPrometheus = false
|
||||||
DefaultDisableStartUDP = false
|
DefaultDisableStartUDP = false
|
||||||
|
@ -86,6 +109,73 @@ var (
|
||||||
DefaultFilteringChannelIds = []string{}
|
DefaultFilteringChannelIds = []string{}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func loadBanner(bannerFile *string, serverVersion string) *string {
|
||||||
|
var banner string
|
||||||
|
|
||||||
|
data, err := os.ReadFile(*bannerFile)
|
||||||
|
if err != nil {
|
||||||
|
banner = fmt.Sprintf("You are connected to an %s server.", serverVersion)
|
||||||
|
} else {
|
||||||
|
banner = string(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
banner := os.Getenv("BANNER")
|
||||||
|
if banner == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
return &banner
|
||||||
|
}
|
||||||
|
|
||||||
|
// MakeDefaultArgs creates a default set of arguments for testing the server.
|
||||||
|
func MakeDefaultTestArgs() *Args {
|
||||||
|
args := &Args{
|
||||||
|
CmdType: ServeCmd,
|
||||||
|
Host: DefaultHost,
|
||||||
|
Port: DefaultPort,
|
||||||
|
DBPath: DefaultDBPath,
|
||||||
|
EsHost: DefaultEsHost,
|
||||||
|
EsPort: DefaultEsPort,
|
||||||
|
PrometheusPort: DefaultPrometheusPort,
|
||||||
|
NotifierPort: DefaultNotifierPort,
|
||||||
|
JSONRPCPort: DefaultJSONRPCPort,
|
||||||
|
JSONRPCHTTPPort: DefaultJSONRPCHTTPPort,
|
||||||
|
MaxSessions: DefaultMaxSessions,
|
||||||
|
SessionTimeout: DefaultSessionTimeout,
|
||||||
|
EsIndex: DefaultEsIndex,
|
||||||
|
RefreshDelta: DefaultRefreshDelta,
|
||||||
|
CacheTTL: DefaultCacheTTL,
|
||||||
|
PeerFile: DefaultPeerFile,
|
||||||
|
Banner: nil,
|
||||||
|
Country: DefaultCountry,
|
||||||
|
|
||||||
|
GenesisHash: chaincfg.TestNet3Params.GenesisHash.String(),
|
||||||
|
ServerVersion: HUB_PROTOCOL_VERSION,
|
||||||
|
ProtocolMin: PROTOCOL_MIN,
|
||||||
|
ProtocolMax: PROTOCOL_MAX,
|
||||||
|
ServerDescription: DefaultServerDescription,
|
||||||
|
PaymentAddress: DefaultPaymentAddress,
|
||||||
|
DonationAddress: DefaultDonationAddress,
|
||||||
|
DailyFee: DefaultDailyFee,
|
||||||
|
|
||||||
|
DisableEs: true,
|
||||||
|
Debug: true,
|
||||||
|
DisableLoadPeers: true,
|
||||||
|
DisableStartPrometheus: true,
|
||||||
|
DisableStartUDP: true,
|
||||||
|
DisableWritePeers: true,
|
||||||
|
DisableRocksDBRefresh: true,
|
||||||
|
DisableResolve: true,
|
||||||
|
DisableBlockingAndFiltering: true,
|
||||||
|
DisableStartNotifier: true,
|
||||||
|
DisableStartJSONRPC: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
return args
|
||||||
|
}
|
||||||
|
|
||||||
// GetEnvironment takes the environment variables as an array of strings
|
// GetEnvironment takes the environment variables as an array of strings
|
||||||
// and a getkeyval function to turn it into a map.
|
// and a getkeyval function to turn it into a map.
|
||||||
func GetEnvironment(data []string, getkeyval func(item string) (key, val string)) map[string]string {
|
func GetEnvironment(data []string, getkeyval func(item string) (key, val string)) map[string]string {
|
||||||
|
@ -111,7 +201,7 @@ func GetEnvironmentStandard() map[string]string {
|
||||||
func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
||||||
|
|
||||||
environment := GetEnvironmentStandard()
|
environment := GetEnvironmentStandard()
|
||||||
parser := argparse.NewParser("hub", "hub server and client")
|
parser := argparse.NewParser("herald", "herald server and client")
|
||||||
|
|
||||||
serveCmd := parser.NewCommand("serve", "start the hub server")
|
serveCmd := parser.NewCommand("serve", "start the hub server")
|
||||||
searchCmd := parser.NewCommand("search", "claim search")
|
searchCmd := parser.NewCommand("search", "claim search")
|
||||||
|
@ -122,6 +212,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// main server config arguments
|
||||||
host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost})
|
host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost})
|
||||||
port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort})
|
port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort})
|
||||||
dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath})
|
dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath})
|
||||||
|
@ -131,18 +222,26 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
||||||
esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort})
|
esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort})
|
||||||
prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort})
|
prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort})
|
||||||
notifierPort := parser.String("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort})
|
notifierPort := parser.String("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort})
|
||||||
jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort})
|
jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort, Default: DefaultJSONRPCPort})
|
||||||
jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort})
|
jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort, Default: DefaultJSONRPCHTTPPort})
|
||||||
maxSessions := parser.Int("", "max-sessions", &argparse.Options{Required: false, Help: "Maximum number of electrum clients that can be connected", Default: DefaultMaxSessions})
|
maxSessions := parser.Int("", "max-sessions", &argparse.Options{Required: false, Help: "Maximum number of electrum clients that can be connected", Default: DefaultMaxSessions})
|
||||||
sessionTimeout := parser.Int("", "session-timeout", &argparse.Options{Required: false, Help: "Session inactivity timeout (seconds)", Default: DefaultSessionTimeout})
|
sessionTimeout := parser.Int("", "session-timeout", &argparse.Options{Required: false, Help: "Session inactivity timeout (seconds)", Default: DefaultSessionTimeout})
|
||||||
esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: DefaultEsIndex})
|
esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: DefaultEsIndex})
|
||||||
refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: DefaultRefreshDelta})
|
refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: DefaultRefreshDelta})
|
||||||
cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: DefaultCacheTTL})
|
cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: DefaultCacheTTL})
|
||||||
peerFile := parser.String("", "peerfile", &argparse.Options{Required: false, Help: "Initial peer file for federation", Default: DefaultPeerFile})
|
peerFile := parser.String("", "peerfile", &argparse.Options{Required: false, Help: "Initial peer file for federation", Default: DefaultPeerFile})
|
||||||
|
bannerFile := parser.String("", "bannerfile", &argparse.Options{Required: false, Help: "Banner file server.banner", Default: DefaultBannerFile})
|
||||||
country := parser.String("", "country", &argparse.Options{Required: false, Help: "Country this node is running in. Default US.", Default: DefaultCountry})
|
country := parser.String("", "country", &argparse.Options{Required: false, Help: "Country this node is running in. Default US.", Default: DefaultCountry})
|
||||||
blockingChannelIds := parser.StringList("", "blocking-channel-ids", &argparse.Options{Required: false, Help: "Blocking channel ids", Default: DefaultBlockingChannelIds})
|
blockingChannelIds := parser.StringList("", "blocking-channel-ids", &argparse.Options{Required: false, Help: "Blocking channel ids", Default: DefaultBlockingChannelIds})
|
||||||
filteringChannelIds := parser.StringList("", "filtering-channel-ids", &argparse.Options{Required: false, Help: "Filtering channel ids", Default: DefaultFilteringChannelIds})
|
filteringChannelIds := parser.StringList("", "filtering-channel-ids", &argparse.Options{Required: false, Help: "Filtering channel ids", Default: DefaultFilteringChannelIds})
|
||||||
|
|
||||||
|
// arguments for server features
|
||||||
|
serverDescription := parser.String("", "server-description", &argparse.Options{Required: false, Help: "Server description", Default: DefaultServerDescription})
|
||||||
|
paymentAddress := parser.String("", "payment-address", &argparse.Options{Required: false, Help: "Payment address", Default: DefaultPaymentAddress})
|
||||||
|
donationAddress := parser.String("", "donation-address", &argparse.Options{Required: false, Help: "Donation address", Default: DefaultDonationAddress})
|
||||||
|
dailyFee := parser.String("", "daily-fee", &argparse.Options{Required: false, Help: "Daily fee", Default: DefaultDailyFee})
|
||||||
|
|
||||||
|
// flags for disabling features
|
||||||
debug := parser.Flag("", "debug", &argparse.Options{Required: false, Help: "enable debug logging", Default: false})
|
debug := parser.Flag("", "debug", &argparse.Options{Required: false, Help: "enable debug logging", Default: false})
|
||||||
disableEs := parser.Flag("", "disable-es", &argparse.Options{Required: false, Help: "Disable elastic search, for running/testing independently", Default: false})
|
disableEs := parser.Flag("", "disable-es", &argparse.Options{Required: false, Help: "Disable elastic search, for running/testing independently", Default: false})
|
||||||
disableLoadPeers := parser.Flag("", "disable-load-peers", &argparse.Options{Required: false, Help: "Disable load peers from disk at startup", Default: DefaultDisableLoadPeers})
|
disableLoadPeers := parser.Flag("", "disable-load-peers", &argparse.Options{Required: false, Help: "Disable load peers from disk at startup", Default: DefaultDisableLoadPeers})
|
||||||
|
@ -156,6 +255,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
||||||
disableStartNotifier := parser.Flag("", "disable-start-notifier", &argparse.Options{Required: false, Help: "Disable start notifier", Default: DisableStartNotifier})
|
disableStartNotifier := parser.Flag("", "disable-start-notifier", &argparse.Options{Required: false, Help: "Disable start notifier", Default: DisableStartNotifier})
|
||||||
disableStartJSONRPC := parser.Flag("", "disable-start-jsonrpc", &argparse.Options{Required: false, Help: "Disable start jsonrpc endpoint", Default: DisableStartJSONRPC})
|
disableStartJSONRPC := parser.Flag("", "disable-start-jsonrpc", &argparse.Options{Required: false, Help: "Disable start jsonrpc endpoint", Default: DisableStartJSONRPC})
|
||||||
|
|
||||||
|
// search command arguments
|
||||||
text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"})
|
text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"})
|
||||||
name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"})
|
name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"})
|
||||||
claimType := parser.String("", "claim_type", &argparse.Options{Required: false, Help: "claim_type"})
|
claimType := parser.String("", "claim_type", &argparse.Options{Required: false, Help: "claim_type"})
|
||||||
|
@ -177,27 +277,40 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
||||||
*jsonRPCPort = DefaultJSONRPCPort
|
*jsonRPCPort = DefaultJSONRPCPort
|
||||||
}
|
}
|
||||||
|
|
||||||
|
banner := loadBanner(bannerFile, HUB_PROTOCOL_VERSION)
|
||||||
|
|
||||||
args := &Args{
|
args := &Args{
|
||||||
CmdType: SearchCmd,
|
CmdType: SearchCmd,
|
||||||
Host: *host,
|
Host: *host,
|
||||||
Port: *port,
|
Port: *port,
|
||||||
DBPath: *dbPath,
|
DBPath: *dbPath,
|
||||||
Chain: chain,
|
Chain: chain,
|
||||||
EsHost: *esHost,
|
EsHost: *esHost,
|
||||||
EsPort: *esPort,
|
EsPort: *esPort,
|
||||||
PrometheusPort: *prometheusPort,
|
PrometheusPort: *prometheusPort,
|
||||||
NotifierPort: *notifierPort,
|
NotifierPort: *notifierPort,
|
||||||
JSONRPCPort: *jsonRPCPort,
|
JSONRPCPort: *jsonRPCPort,
|
||||||
JSONRPCHTTPPort: *jsonRPCHTTPPort,
|
JSONRPCHTTPPort: *jsonRPCHTTPPort,
|
||||||
MaxSessions: *maxSessions,
|
MaxSessions: *maxSessions,
|
||||||
SessionTimeout: *sessionTimeout,
|
SessionTimeout: *sessionTimeout,
|
||||||
EsIndex: *esIndex,
|
EsIndex: *esIndex,
|
||||||
RefreshDelta: *refreshDelta,
|
RefreshDelta: *refreshDelta,
|
||||||
CacheTTL: *cacheTTL,
|
CacheTTL: *cacheTTL,
|
||||||
PeerFile: *peerFile,
|
PeerFile: *peerFile,
|
||||||
Country: *country,
|
Banner: banner,
|
||||||
BlockingChannelIds: *blockingChannelIds,
|
Country: *country,
|
||||||
FilteringChannelIds: *filteringChannelIds,
|
BlockingChannelIds: *blockingChannelIds,
|
||||||
|
FilteringChannelIds: *filteringChannelIds,
|
||||||
|
|
||||||
|
GenesisHash: "",
|
||||||
|
ServerVersion: HUB_PROTOCOL_VERSION,
|
||||||
|
ProtocolMin: PROTOCOL_MIN,
|
||||||
|
ProtocolMax: PROTOCOL_MAX,
|
||||||
|
ServerDescription: *serverDescription,
|
||||||
|
PaymentAddress: *paymentAddress,
|
||||||
|
DonationAddress: *donationAddress,
|
||||||
|
DailyFee: *dailyFee,
|
||||||
|
|
||||||
Debug: *debug,
|
Debug: *debug,
|
||||||
DisableEs: *disableEs,
|
DisableEs: *disableEs,
|
||||||
DisableLoadPeers: *disableLoadPeers,
|
DisableLoadPeers: *disableLoadPeers,
|
||||||
|
|
|
@ -12,7 +12,8 @@ import (
|
||||||
|
|
||||||
"github.com/lbryio/herald.go/internal/metrics"
|
"github.com/lbryio/herald.go/internal/metrics"
|
||||||
pb "github.com/lbryio/herald.go/protobuf/go"
|
pb "github.com/lbryio/herald.go/protobuf/go"
|
||||||
server "github.com/lbryio/herald.go/server"
|
"github.com/lbryio/herald.go/server"
|
||||||
|
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||||
dto "github.com/prometheus/client_model/go"
|
dto "github.com/prometheus/client_model/go"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
@ -44,43 +45,11 @@ func removeFile(fileName string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// makeDefaultArgs creates a default set of arguments for testing the server.
|
|
||||||
func makeDefaultArgs() *server.Args {
|
|
||||||
args := &server.Args{
|
|
||||||
CmdType: server.ServeCmd,
|
|
||||||
Host: server.DefaultHost,
|
|
||||||
Port: server.DefaultPort,
|
|
||||||
DBPath: server.DefaultDBPath,
|
|
||||||
EsHost: server.DefaultEsHost,
|
|
||||||
EsPort: server.DefaultEsPort,
|
|
||||||
PrometheusPort: server.DefaultPrometheusPort,
|
|
||||||
NotifierPort: server.DefaultNotifierPort,
|
|
||||||
JSONRPCPort: server.DefaultJSONRPCPort,
|
|
||||||
EsIndex: server.DefaultEsIndex,
|
|
||||||
RefreshDelta: server.DefaultRefreshDelta,
|
|
||||||
CacheTTL: server.DefaultCacheTTL,
|
|
||||||
PeerFile: server.DefaultPeerFile,
|
|
||||||
Country: server.DefaultCountry,
|
|
||||||
DisableEs: true,
|
|
||||||
Debug: true,
|
|
||||||
DisableLoadPeers: true,
|
|
||||||
DisableStartPrometheus: true,
|
|
||||||
DisableStartUDP: true,
|
|
||||||
DisableWritePeers: true,
|
|
||||||
DisableRocksDBRefresh: true,
|
|
||||||
DisableResolve: true,
|
|
||||||
DisableBlockingAndFiltering: true,
|
|
||||||
DisableStartNotifier: true,
|
|
||||||
DisableStartJSONRPC: true,
|
|
||||||
}
|
|
||||||
|
|
||||||
return args
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestAddPeer tests the ability to add peers
|
// TestAddPeer tests the ability to add peers
|
||||||
func TestAddPeer(t *testing.T) {
|
func TestAddPeer(t *testing.T) {
|
||||||
ctx := context.Background()
|
// ctx := context.Background()
|
||||||
args := makeDefaultArgs()
|
ctx := stop.NewDebug()
|
||||||
|
args := server.MakeDefaultTestArgs()
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
@ -137,8 +106,9 @@ func TestAddPeer(t *testing.T) {
|
||||||
|
|
||||||
// TestPeerWriter tests that peers get written properly
|
// TestPeerWriter tests that peers get written properly
|
||||||
func TestPeerWriter(t *testing.T) {
|
func TestPeerWriter(t *testing.T) {
|
||||||
ctx := context.Background()
|
// ctx := context.Background()
|
||||||
args := makeDefaultArgs()
|
ctx := stop.NewDebug()
|
||||||
|
args := server.MakeDefaultTestArgs()
|
||||||
args.DisableWritePeers = false
|
args.DisableWritePeers = false
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
|
@ -193,9 +163,10 @@ func TestPeerWriter(t *testing.T) {
|
||||||
|
|
||||||
// TestAddPeerEndpoint tests the ability to add peers
|
// TestAddPeerEndpoint tests the ability to add peers
|
||||||
func TestAddPeerEndpoint(t *testing.T) {
|
func TestAddPeerEndpoint(t *testing.T) {
|
||||||
ctx := context.Background()
|
// ctx := context.Background()
|
||||||
args := makeDefaultArgs()
|
ctx := stop.NewDebug()
|
||||||
args2 := makeDefaultArgs()
|
args := server.MakeDefaultTestArgs()
|
||||||
|
args2 := server.MakeDefaultTestArgs()
|
||||||
args2.Port = "50052"
|
args2.Port = "50052"
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
|
@ -264,10 +235,11 @@ func TestAddPeerEndpoint(t *testing.T) {
|
||||||
|
|
||||||
// TestAddPeerEndpoint2 tests the ability to add peers
|
// TestAddPeerEndpoint2 tests the ability to add peers
|
||||||
func TestAddPeerEndpoint2(t *testing.T) {
|
func TestAddPeerEndpoint2(t *testing.T) {
|
||||||
ctx := context.Background()
|
// ctx := context.Background()
|
||||||
args := makeDefaultArgs()
|
ctx := stop.NewDebug()
|
||||||
args2 := makeDefaultArgs()
|
args := server.MakeDefaultTestArgs()
|
||||||
args3 := makeDefaultArgs()
|
args2 := server.MakeDefaultTestArgs()
|
||||||
|
args3 := server.MakeDefaultTestArgs()
|
||||||
args2.Port = "50052"
|
args2.Port = "50052"
|
||||||
args3.Port = "50053"
|
args3.Port = "50053"
|
||||||
|
|
||||||
|
@ -345,10 +317,11 @@ func TestAddPeerEndpoint2(t *testing.T) {
|
||||||
|
|
||||||
// TestAddPeerEndpoint3 tests the ability to add peers
|
// TestAddPeerEndpoint3 tests the ability to add peers
|
||||||
func TestAddPeerEndpoint3(t *testing.T) {
|
func TestAddPeerEndpoint3(t *testing.T) {
|
||||||
ctx := context.Background()
|
// ctx := context.Background()
|
||||||
args := makeDefaultArgs()
|
ctx := stop.NewDebug()
|
||||||
args2 := makeDefaultArgs()
|
args := server.MakeDefaultTestArgs()
|
||||||
args3 := makeDefaultArgs()
|
args2 := server.MakeDefaultTestArgs()
|
||||||
|
args3 := server.MakeDefaultTestArgs()
|
||||||
args2.Port = "50052"
|
args2.Port = "50052"
|
||||||
args3.Port = "50053"
|
args3.Port = "50053"
|
||||||
|
|
||||||
|
@ -434,10 +407,11 @@ func TestAddPeerEndpoint3(t *testing.T) {
|
||||||
|
|
||||||
// TestAddPeer tests the ability to add peers
|
// TestAddPeer tests the ability to add peers
|
||||||
func TestUDPServer(t *testing.T) {
|
func TestUDPServer(t *testing.T) {
|
||||||
ctx := context.Background()
|
// ctx := context.Background()
|
||||||
args := makeDefaultArgs()
|
ctx := stop.NewDebug()
|
||||||
|
args := server.MakeDefaultTestArgs()
|
||||||
args.DisableStartUDP = false
|
args.DisableStartUDP = false
|
||||||
args2 := makeDefaultArgs()
|
args2 := server.MakeDefaultTestArgs()
|
||||||
args2.Port = "50052"
|
args2.Port = "50052"
|
||||||
args2.DisableStartUDP = false
|
args2.DisableStartUDP = false
|
||||||
|
|
||||||
|
|
|
@ -105,6 +105,7 @@ func newBlockHeaderElectrum(header *[HEADER_SIZE]byte, height uint32) *BlockHead
|
||||||
type BlockGetServerHeightReq struct{}
|
type BlockGetServerHeightReq struct{}
|
||||||
type BlockGetServerHeightResp uint32
|
type BlockGetServerHeightResp uint32
|
||||||
|
|
||||||
|
// blockchain.block.get_server_height
|
||||||
func (s *BlockchainBlockService) Get_server_height(req *BlockGetServerHeightReq, resp **BlockGetServerHeightResp) error {
|
func (s *BlockchainBlockService) Get_server_height(req *BlockGetServerHeightReq, resp **BlockGetServerHeightResp) error {
|
||||||
if s.DB == nil || s.DB.LastState == nil {
|
if s.DB == nil || s.DB.LastState == nil {
|
||||||
return fmt.Errorf("unknown height")
|
return fmt.Errorf("unknown height")
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"github.com/lbryio/lbcd/chaincfg"
|
"github.com/lbryio/lbcd/chaincfg"
|
||||||
"github.com/lbryio/lbcd/txscript"
|
"github.com/lbryio/lbcd/txscript"
|
||||||
"github.com/lbryio/lbcutil"
|
"github.com/lbryio/lbcutil"
|
||||||
|
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Source: test_variety_of_transactions_and_longish_history (lbry-sdk/tests/integration/transactions)
|
// Source: test_variety_of_transactions_and_longish_history (lbry-sdk/tests/integration/transactions)
|
||||||
|
@ -57,8 +58,9 @@ var regTestAddrs = [30]string{
|
||||||
|
|
||||||
func TestServerGetHeight(t *testing.T) {
|
func TestServerGetHeight(t *testing.T) {
|
||||||
secondaryPath := "asdf"
|
secondaryPath := "asdf"
|
||||||
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
|
grp := stop.NewDebug()
|
||||||
defer toDefer()
|
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
|
@ -87,8 +89,9 @@ func TestServerGetHeight(t *testing.T) {
|
||||||
|
|
||||||
func TestGetChunk(t *testing.T) {
|
func TestGetChunk(t *testing.T) {
|
||||||
secondaryPath := "asdf"
|
secondaryPath := "asdf"
|
||||||
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
|
grp := stop.NewDebug()
|
||||||
defer toDefer()
|
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
|
@ -130,8 +133,9 @@ func TestGetChunk(t *testing.T) {
|
||||||
|
|
||||||
func TestGetHeader(t *testing.T) {
|
func TestGetHeader(t *testing.T) {
|
||||||
secondaryPath := "asdf"
|
secondaryPath := "asdf"
|
||||||
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
|
grp := stop.NewDebug()
|
||||||
defer toDefer()
|
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
|
@ -159,8 +163,9 @@ func TestGetHeader(t *testing.T) {
|
||||||
|
|
||||||
func TestHeaders(t *testing.T) {
|
func TestHeaders(t *testing.T) {
|
||||||
secondaryPath := "asdf"
|
secondaryPath := "asdf"
|
||||||
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
|
grp := stop.NewDebug()
|
||||||
defer toDefer()
|
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
|
@ -189,15 +194,17 @@ func TestHeaders(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHeadersSubscribe(t *testing.T) {
|
func TestHeadersSubscribe(t *testing.T) {
|
||||||
|
args := MakeDefaultTestArgs()
|
||||||
|
grp := stop.NewDebug()
|
||||||
secondaryPath := "asdf"
|
secondaryPath := "asdf"
|
||||||
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
|
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
|
||||||
defer toDefer()
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
sm := newSessionManager(db, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout)
|
sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams)
|
||||||
sm.start()
|
sm.start()
|
||||||
defer sm.stop()
|
defer sm.stop()
|
||||||
|
|
||||||
|
@ -209,13 +216,13 @@ func TestHeadersSubscribe(t *testing.T) {
|
||||||
// Set up logic to read a notification.
|
// Set up logic to read a notification.
|
||||||
var received sync.WaitGroup
|
var received sync.WaitGroup
|
||||||
recv := func(client net.Conn) {
|
recv := func(client net.Conn) {
|
||||||
|
defer received.Done()
|
||||||
buf := make([]byte, 1024)
|
buf := make([]byte, 1024)
|
||||||
len, err := client.Read(buf)
|
len, err := client.Read(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("read err: %v", err)
|
t.Errorf("read err: %v", err)
|
||||||
}
|
}
|
||||||
t.Logf("len: %v notification: %v", len, string(buf))
|
t.Logf("len: %v notification: %v", len, string(buf))
|
||||||
received.Done()
|
|
||||||
}
|
}
|
||||||
received.Add(2)
|
received.Add(2)
|
||||||
go recv(client1)
|
go recv(client1)
|
||||||
|
@ -281,8 +288,9 @@ func TestHeadersSubscribe(t *testing.T) {
|
||||||
|
|
||||||
func TestGetBalance(t *testing.T) {
|
func TestGetBalance(t *testing.T) {
|
||||||
secondaryPath := "asdf"
|
secondaryPath := "asdf"
|
||||||
`defer db.Shutdown()` to match style elsewhere.
|
|||||||
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
|
grp := stop.NewDebug()
|
||||||
defer toDefer()
|
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
|
@ -310,8 +318,9 @@ func TestGetBalance(t *testing.T) {
|
||||||
|
|
||||||
func TestGetHistory(t *testing.T) {
|
func TestGetHistory(t *testing.T) {
|
||||||
secondaryPath := "asdf"
|
secondaryPath := "asdf"
|
||||||
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
|
grp := stop.NewDebug()
|
||||||
defer toDefer()
|
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
|
@ -339,8 +348,9 @@ func TestGetHistory(t *testing.T) {
|
||||||
|
|
||||||
func TestListUnspent(t *testing.T) {
|
func TestListUnspent(t *testing.T) {
|
||||||
secondaryPath := "asdf"
|
secondaryPath := "asdf"
|
||||||
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
|
grp := stop.NewDebug()
|
||||||
defer toDefer()
|
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
|
||||||
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
|
@ -367,15 +377,17 @@ func TestListUnspent(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAddressSubscribe(t *testing.T) {
|
func TestAddressSubscribe(t *testing.T) {
|
||||||
|
args := MakeDefaultTestArgs()
|
||||||
|
grp := stop.NewDebug()
|
||||||
secondaryPath := "asdf"
|
secondaryPath := "asdf"
|
||||||
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
|
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
|
||||||
defer toDefer()
|
defer db.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
sm := newSessionManager(db, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout)
|
sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams)
|
||||||
sm.start()
|
sm.start()
|
||||||
defer sm.stop()
|
defer sm.stop()
|
||||||
|
|
||||||
|
|
89
server/jsonrpc_server.go
Normal file
89
server/jsonrpc_server.go
Normal file
|
@ -0,0 +1,89 @@
|
||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ServerService struct {
|
||||||
|
Args *Args
|
||||||
|
}
|
||||||
|
|
||||||
|
type ServerFeatureService struct {
|
||||||
|
Args *Args
|
||||||
|
}
|
||||||
|
|
||||||
|
type ServerFeaturesReq struct{}
|
||||||
|
|
||||||
|
type ServerFeaturesRes struct {
|
||||||
|
Hosts map[string]string `json:"hosts"`
|
||||||
|
Pruning string `json:"pruning"`
|
||||||
|
ServerVersion string `json:"server_version"`
|
||||||
|
ProtocolMin string `json:"protocol_min"`
|
||||||
|
ProtocolMax string `json:"protocol_max"`
|
||||||
|
GenesisHash string `json:"genesis_hash"`
|
||||||
|
Description string `json:"description"`
|
||||||
|
PaymentAddress string `json:"payment_address"`
|
||||||
|
DonationAddress string `json:"donation_address"`
|
||||||
|
DailyFee string `json:"daily_fee"`
|
||||||
|
HashFunction string `json:"hash_function"`
|
||||||
|
TrendingAlgorithm string `json:"trending_algorithm"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Features is the json rpc endpoint for 'server.features'.
|
||||||
|
func (t *ServerService) Features(req *ServerFeaturesReq, res **ServerFeaturesRes) error {
|
||||||
|
log.Println("Features")
|
||||||
|
|
||||||
|
features := &ServerFeaturesRes{
|
||||||
|
Hosts: map[string]string{},
|
||||||
|
Pruning: "",
|
||||||
|
ServerVersion: HUB_PROTOCOL_VERSION,
|
||||||
|
ProtocolMin: PROTOCOL_MIN,
|
||||||
|
ProtocolMax: PROTOCOL_MAX,
|
||||||
|
GenesisHash: t.Args.GenesisHash,
|
||||||
|
Description: t.Args.ServerDescription,
|
||||||
|
PaymentAddress: t.Args.PaymentAddress,
|
||||||
|
DonationAddress: t.Args.DonationAddress,
|
||||||
|
DailyFee: t.Args.DailyFee,
|
||||||
|
HashFunction: "sha256",
|
||||||
|
TrendingAlgorithm: "fast_ar",
|
||||||
|
}
|
||||||
|
*res = features
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type ServerBannerService struct {
|
||||||
|
Args *Args
|
||||||
|
}
|
||||||
|
|
||||||
|
type ServerBannerReq struct{}
|
||||||
|
|
||||||
|
type ServerBannerRes string
|
||||||
|
|
||||||
|
// Banner is the json rpc endpoint for 'server.banner'.
|
||||||
|
func (t *ServerService) Banner(req *ServerBannerReq, res **ServerBannerRes) error {
|
||||||
|
log.Println("Banner")
|
||||||
|
|
||||||
|
*res = (*ServerBannerRes)(t.Args.Banner)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type ServerVersionService struct {
|
||||||
|
Args *Args
|
||||||
|
}
|
||||||
|
|
||||||
|
type ServerVersionReq struct{}
|
||||||
|
|
||||||
|
type ServerVersionRes string
|
||||||
|
|
||||||
|
// Banner is the json rpc endpoint for 'server.version'.
|
||||||
|
// FIXME: This should return a struct with the version and the protocol version.
|
||||||
|
// <<-- that comment was written by github, scary shit because it's true
|
||||||
|
func (t *ServerService) Version(req *ServerVersionReq, res **ServerVersionRes) error {
|
||||||
|
log.Println("Version")
|
||||||
|
|
||||||
|
*res = (*ServerVersionRes)(&t.Args.ServerVersion)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -121,6 +121,14 @@ fail1:
|
||||||
goto fail2
|
goto fail2
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Register "server.{features,banner,version}" handlers.
|
||||||
|
serverSvc := &ServerService{s.Args}
|
||||||
|
err = s1.RegisterTCPService(serverSvc, "server")
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("RegisterTCPService: %v\n", err)
|
||||||
|
goto fail2
|
||||||
|
}
|
||||||
|
|
||||||
r := gorilla_mux.NewRouter()
|
r := gorilla_mux.NewRouter()
|
||||||
r.Handle("/rpc", s1)
|
r.Handle("/rpc", s1)
|
||||||
port := ":" + strconv.FormatUint(uint64(s.Args.JSONRPCHTTPPort), 10)
|
port := ":" + strconv.FormatUint(uint64(s.Args.JSONRPCHTTPPort), 10)
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package server_test
|
package server_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
@ -10,6 +9,7 @@ import (
|
||||||
|
|
||||||
"github.com/lbryio/herald.go/internal"
|
"github.com/lbryio/herald.go/internal"
|
||||||
"github.com/lbryio/herald.go/server"
|
"github.com/lbryio/herald.go/server"
|
||||||
|
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -47,8 +47,9 @@ func tcpRead(conn net.Conn) ([]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNotifierServer(t *testing.T) {
|
func TestNotifierServer(t *testing.T) {
|
||||||
args := makeDefaultArgs()
|
args := server.MakeDefaultTestArgs()
|
||||||
ctx := context.Background()
|
// ctx := context.Background()
|
||||||
|
ctx := stop.NewDebug()
|
||||||
hub := server.MakeHubServer(ctx, args)
|
hub := server.MakeHubServer(ctx, args)
|
||||||
|
|
||||||
go hub.NotifierServer()
|
go hub.NotifierServer()
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
|
|
||||||
pb "github.com/lbryio/herald.go/protobuf/go"
|
pb "github.com/lbryio/herald.go/protobuf/go"
|
||||||
server "github.com/lbryio/herald.go/server"
|
server "github.com/lbryio/herald.go/server"
|
||||||
|
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||||
"github.com/olivere/elastic/v7"
|
"github.com/olivere/elastic/v7"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -55,13 +56,14 @@ func TestSearch(t *testing.T) {
|
||||||
w.Write([]byte(resp))
|
w.Write([]byte(resp))
|
||||||
}
|
}
|
||||||
|
|
||||||
context := context.Background()
|
ctx := context.Background()
|
||||||
args := makeDefaultArgs()
|
stopGroup := stop.NewDebug()
|
||||||
hubServer := server.MakeHubServer(context, args)
|
args := server.MakeDefaultTestArgs()
|
||||||
|
hubServer := server.MakeHubServer(stopGroup, args)
|
||||||
req := &pb.SearchRequest{
|
req := &pb.SearchRequest{
|
||||||
Text: "asdf",
|
Text: "asdf",
|
||||||
}
|
}
|
||||||
out, err := hubServer.Search(context, req)
|
out, err := hubServer.Search(ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import (
|
||||||
"github.com/lbryio/herald.go/meta"
|
"github.com/lbryio/herald.go/meta"
|
||||||
pb "github.com/lbryio/herald.go/protobuf/go"
|
pb "github.com/lbryio/herald.go/protobuf/go"
|
||||||
"github.com/lbryio/lbcd/chaincfg"
|
"github.com/lbryio/lbcd/chaincfg"
|
||||||
|
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||||
"github.com/olivere/elastic/v7"
|
"github.com/olivere/elastic/v7"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
|
@ -53,6 +54,7 @@ type Server struct {
|
||||||
HeightSubs map[net.Addr]net.Conn
|
HeightSubs map[net.Addr]net.Conn
|
||||||
HeightSubsMut sync.RWMutex
|
HeightSubsMut sync.RWMutex
|
||||||
NotifierChan chan interface{}
|
NotifierChan chan interface{}
|
||||||
|
Grp *stop.Group
|
||||||
sessionManager *sessionManager
|
sessionManager *sessionManager
|
||||||
pb.UnimplementedHubServer
|
pb.UnimplementedHubServer
|
||||||
}
|
}
|
||||||
|
@ -149,7 +151,7 @@ func (s *Server) Run() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) {
|
func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, error) {
|
||||||
tmpName, err := ioutil.TempDir("", "go-lbry-hub")
|
tmpName, err := ioutil.TempDir("", "go-lbry-hub")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Info(err)
|
logrus.Info(err)
|
||||||
|
@ -159,10 +161,7 @@ func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Info(err)
|
logrus.Info(err)
|
||||||
}
|
}
|
||||||
myDB, _, err := db.GetProdDB(args.DBPath, tmpName)
|
myDB, err := db.GetProdDB(args.DBPath, tmpName, grp)
|
||||||
// dbShutdown = func() {
|
|
||||||
// db.Shutdown(myDB)
|
|
||||||
// }
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Can't load the db, fail loudly
|
// Can't load the db, fail loudly
|
||||||
logrus.Info(err)
|
logrus.Info(err)
|
||||||
|
@ -217,7 +216,7 @@ func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) {
|
||||||
// MakeHubServer takes the arguments given to a hub when it's started and
|
// MakeHubServer takes the arguments given to a hub when it's started and
|
||||||
// initializes everything. It loads information about previously known peers,
|
// initializes everything. It loads information about previously known peers,
|
||||||
// creates needed internal data structures, and initializes goroutines.
|
// creates needed internal data structures, and initializes goroutines.
|
||||||
func MakeHubServer(ctx context.Context, args *Args) *Server {
|
func MakeHubServer(grp *stop.Group, args *Args) *Server {
|
||||||
grpcServer := grpc.NewServer(grpc.NumStreamWorkers(0))
|
grpcServer := grpc.NewServer(grpc.NumStreamWorkers(0))
|
||||||
|
|
||||||
multiSpaceRe, err := regexp.Compile(`\s{2,}`)
|
multiSpaceRe, err := regexp.Compile(`\s{2,}`)
|
||||||
|
@ -266,14 +265,14 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
|
||||||
|
|
||||||
//TODO: is this the right place to load the db?
|
//TODO: is this the right place to load the db?
|
||||||
var myDB *db.ReadOnlyDBColumnFamily
|
var myDB *db.ReadOnlyDBColumnFamily
|
||||||
// var dbShutdown = func() {}
|
|
||||||
if !args.DisableResolve {
|
if !args.DisableResolve {
|
||||||
myDB, err = LoadDatabase(args)
|
myDB, err = LoadDatabase(args, grp)
|
||||||
Not needed. Not needed.
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Warning(err)
|
logrus.Warning(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ahaa. So this is how it gets set. You might consider passing Ahaa. So this is how it gets set. You might consider passing `grp` down into `LoadDatabase()`. So it gets initialized ASAP. Also, don't forget the `ReadonlyDBColumnFamilies` instance constructed in db_test.go.
|
|||||||
|
|
||||||
|
// Determine which chain to use based on db and cli values
|
||||||
dbChain := (*chaincfg.Params)(nil)
|
dbChain := (*chaincfg.Params)(nil)
|
||||||
if myDB != nil && myDB.LastState != nil && myDB.LastState.Genesis != nil {
|
if myDB != nil && myDB.LastState != nil && myDB.LastState.Genesis != nil {
|
||||||
// The chain params can be inferred from DBStateValue.
|
// The chain params can be inferred from DBStateValue.
|
||||||
|
@ -310,6 +309,10 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
|
||||||
}
|
}
|
||||||
logrus.Infof("network: %v", chain.Name)
|
logrus.Infof("network: %v", chain.Name)
|
||||||
|
|
||||||
|
args.GenesisHash = chain.GenesisHash.String()
|
||||||
|
|
||||||
|
sessionGrp := stop.New(grp)
|
||||||
|
|
||||||
s := &Server{
|
s := &Server{
|
||||||
GrpcServer: grpcServer,
|
GrpcServer: grpcServer,
|
||||||
Args: args,
|
Args: args,
|
||||||
|
@ -333,7 +336,8 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
|
||||||
HeightSubs: make(map[net.Addr]net.Conn),
|
HeightSubs: make(map[net.Addr]net.Conn),
|
||||||
HeightSubsMut: sync.RWMutex{},
|
HeightSubsMut: sync.RWMutex{},
|
||||||
NotifierChan: make(chan interface{}),
|
NotifierChan: make(chan interface{}),
|
||||||
sessionManager: newSessionManager(myDB, &chain, args.MaxSessions, args.SessionTimeout),
|
Grp: grp,
|
||||||
|
sessionManager: newSessionManager(myDB, args, sessionGrp, &chain),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start up our background services
|
// Start up our background services
|
||||||
|
|
|
@ -1,5 +1,11 @@
|
||||||
package server
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/lbryio/herald.go/db"
|
||||||
|
"github.com/lbryio/lbcd/chaincfg"
|
||||||
|
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||||
|
)
|
||||||
|
|
||||||
func (s *Server) AddPeerExported() func(*Peer, bool, bool) error {
|
func (s *Server) AddPeerExported() func(*Peer, bool, bool) error {
|
||||||
return s.addPeer
|
return s.addPeer
|
||||||
}
|
}
|
||||||
|
@ -7,3 +13,7 @@ func (s *Server) AddPeerExported() func(*Peer, bool, bool) error {
|
||||||
func (s *Server) GetNumPeersExported() func() int64 {
|
func (s *Server) GetNumPeersExported() func() int64 {
|
||||||
return s.getNumPeers
|
return s.getNumPeers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewSessionManagerExported(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
|
||||||
|
return newSessionManager(db, args, grp, chain)
|
||||||
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/lbryio/herald.go/db"
|
"github.com/lbryio/herald.go/db"
|
||||||
"github.com/lbryio/herald.go/internal"
|
"github.com/lbryio/herald.go/internal"
|
||||||
"github.com/lbryio/lbcd/chaincfg"
|
"github.com/lbryio/lbcd/chaincfg"
|
||||||
|
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -114,13 +115,15 @@ type sessionMap map[uintptr]*session
|
||||||
|
|
||||||
type sessionManager struct {
|
type sessionManager struct {
|
||||||
// sessionsMut protects sessions, headerSubs, hashXSubs state
|
// sessionsMut protects sessions, headerSubs, hashXSubs state
|
||||||
sessionsMut sync.RWMutex
|
sessionsMut sync.RWMutex
|
||||||
sessions sessionMap
|
sessions sessionMap
|
||||||
sessionsWait sync.WaitGroup
|
// sessionsWait sync.WaitGroup
|
||||||
|
grp *stop.Group
|
||||||
sessionsMax int
|
sessionsMax int
|
||||||
sessionTimeout time.Duration
|
sessionTimeout time.Duration
|
||||||
manageTicker *time.Ticker
|
manageTicker *time.Ticker
|
||||||
db *db.ReadOnlyDBColumnFamily
|
db *db.ReadOnlyDBColumnFamily
|
||||||
|
args *Args
|
||||||
chain *chaincfg.Params
|
chain *chaincfg.Params
|
||||||
// headerSubs are sessions subscribed via 'blockchain.headers.subscribe'
|
// headerSubs are sessions subscribed via 'blockchain.headers.subscribe'
|
||||||
headerSubs sessionMap
|
headerSubs sessionMap
|
||||||
|
@ -128,13 +131,15 @@ type sessionManager struct {
|
||||||
hashXSubs map[[HASHX_LEN]byte]sessionMap
|
hashXSubs map[[HASHX_LEN]byte]sessionMap
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSessionManager(db *db.ReadOnlyDBColumnFamily, chain *chaincfg.Params, sessionsMax, sessionTimeout int) *sessionManager {
|
func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
|
||||||
return &sessionManager{
|
return &sessionManager{
|
||||||
sessions: make(sessionMap),
|
sessions: make(sessionMap),
|
||||||
sessionsMax: sessionsMax,
|
grp: grp,
|
||||||
sessionTimeout: time.Duration(sessionTimeout) * time.Second,
|
sessionsMax: args.MaxSessions,
|
||||||
manageTicker: time.NewTicker(time.Duration(max(5, sessionTimeout/20)) * time.Second),
|
sessionTimeout: time.Duration(args.SessionTimeout) * time.Second,
|
||||||
|
manageTicker: time.NewTicker(time.Duration(max(5, args.SessionTimeout/20)) * time.Second),
|
||||||
db: db,
|
db: db,
|
||||||
|
args: args,
|
||||||
chain: chain,
|
chain: chain,
|
||||||
headerSubs: make(sessionMap),
|
headerSubs: make(sessionMap),
|
||||||
hashXSubs: make(map[[HASHX_LEN]byte]sessionMap),
|
hashXSubs: make(map[[HASHX_LEN]byte]sessionMap),
|
||||||
|
@ -142,6 +147,7 @@ func newSessionManager(db *db.ReadOnlyDBColumnFamily, chain *chaincfg.Params, se
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *sessionManager) start() {
|
func (sm *sessionManager) start() {
|
||||||
|
sm.grp.Add(1)
|
||||||
go sm.manage()
|
go sm.manage()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -168,7 +174,13 @@ func (sm *sessionManager) manage() {
|
||||||
}
|
}
|
||||||
sm.sessionsMut.Unlock()
|
sm.sessionsMut.Unlock()
|
||||||
// Wait for next management clock tick.
|
// Wait for next management clock tick.
|
||||||
<-sm.manageTicker.C
|
select {
|
||||||
|
case <-sm.grp.Ch():
|
||||||
|
sm.grp.Done()
|
||||||
|
return
|
||||||
|
case <-sm.manageTicker.C:
|
||||||
|
continue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -190,11 +202,18 @@ func (sm *sessionManager) addSession(conn net.Conn) *session {
|
||||||
// each request and update subscriptions.
|
// each request and update subscriptions.
|
||||||
s1 := rpc.NewServer()
|
s1 := rpc.NewServer()
|
||||||
|
|
||||||
|
// Register "server.{features,banner,version}" handlers.
|
||||||
|
serverSvc := &ServerService{sm.args}
|
||||||
|
err := s1.RegisterName("server", serverSvc)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("RegisterName: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Register "blockchain.claimtrie.*"" handlers.
|
// Register "blockchain.claimtrie.*"" handlers.
|
||||||
claimtrieSvc := &ClaimtrieService{sm.db}
|
claimtrieSvc := &ClaimtrieService{sm.db}
|
||||||
err := s1.RegisterName("blockchain.claimtrie", claimtrieSvc)
|
err = s1.RegisterName("blockchain.claimtrie", claimtrieSvc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("RegisterService: %v\n", err)
|
log.Errorf("RegisterName: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register other "blockchain.{block,address,scripthash}.*" handlers.
|
// Register other "blockchain.{block,address,scripthash}.*" handlers.
|
||||||
|
@ -220,11 +239,11 @@ func (sm *sessionManager) addSession(conn net.Conn) *session {
|
||||||
goto fail
|
goto fail
|
||||||
}
|
}
|
||||||
|
|
||||||
sm.sessionsWait.Add(1)
|
sm.grp.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
s1.ServeCodec(&SessionServerCodec{jsonrpc.NewServerCodec(conn), sess})
|
s1.ServeCodec(&SessionServerCodec{jsonrpc.NewServerCodec(conn), sess})
|
||||||
log.Infof("session %v goroutine exit", sess.addr.String())
|
log.Infof("session %v goroutine exit", sess.addr.String())
|
||||||
sm.sessionsWait.Done()
|
sm.grp.Done()
|
||||||
}()
|
}()
|
||||||
return sess
|
return sess
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,7 @@ import (
|
||||||
|
|
||||||
// TestUDPPing tests UDPPing correctness against prod server.
|
// TestUDPPing tests UDPPing correctness against prod server.
|
||||||
func TestUDPPing(t *testing.T) {
|
func TestUDPPing(t *testing.T) {
|
||||||
args := makeDefaultArgs()
|
args := server.MakeDefaultTestArgs()
|
||||||
args.DisableStartUDP = true
|
args.DisableStartUDP = true
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
|
|
|
@ -8,12 +8,13 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
|
||||||
|
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
// shutdownRequestChannel is used to initiate shutdown from one of the
|
// shutdownRequestChannel is used to initiate shutdown from one of the
|
||||||
// subsystems using the same code paths as when an interrupt signal is received.
|
// subsystems using the same code paths as when an interrupt signal is received.
|
||||||
var shutdownRequestChannel = make(chan struct{})
|
var shutdownRequestChannel = make(stop.Chan)
|
||||||
|
|
||||||
// interruptSignals defines the default signals to catch in order to do a proper
|
// interruptSignals defines the default signals to catch in order to do a proper
|
||||||
// shutdown. This may be modified during init depending on the platform.
|
// shutdown. This may be modified during init depending on the platform.
|
||||||
|
|
|
@ -10,9 +10,12 @@ package main
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
|
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||||
)
|
)
|
||||||
|
|
||||||
// initsignals sets the signals to be caught by the signal handler
|
// initsignals sets the signals to be caught by the signal handler
|
||||||
func initsignals() {
|
func initsignals(stopCh stop.Chan) {
|
||||||
|
shutdownRequestChannel = stopCh
|
||||||
interruptSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
|
interruptSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue
OpenIterator
s,ItMut
not needed anymore.