Server endpoints goroutine refactor #69
21 changed files with 546 additions and 307 deletions
76
db/db.go
76
db/db.go
|
@ -8,7 +8,6 @@ import (
|
|||
"encoding/hex"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/herald.go/db/prefixes"
|
||||
|
@ -16,6 +15,7 @@ import (
|
|||
"github.com/lbryio/herald.go/internal"
|
||||
"github.com/lbryio/herald.go/internal/metrics"
|
||||
pb "github.com/lbryio/herald.go/protobuf/go"
|
||||
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||
"github.com/linxGnu/grocksdb"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
@ -59,11 +59,7 @@ type ReadOnlyDBColumnFamily struct {
|
|||
BlockedChannels map[string][]byte
|
||||
FilteredStreams map[string][]byte
|
||||
FilteredChannels map[string][]byte
|
||||
|
||||
OpenIterators map[string][]chan struct{}
|
||||
ItMut sync.RWMutex
|
||||
ShutdownChan chan struct{}
|
||||
DoneChan chan struct{}
|
||||
ShutdownCalled bool
|
||||
Grp *stop.Group
|
||||
Cleanup func()
|
||||
}
|
||||
|
||||
|
@ -339,19 +335,10 @@ func interruptRequested(interrupted <-chan struct{}) bool {
|
|||
func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
|
||||
ch := make(chan *prefixes.PrefixRowKV)
|
||||
|
||||
iterKey := fmt.Sprintf("%p", opts)
|
||||
if opts.DB != nil {
|
||||
opts.DB.ItMut.Lock()
|
||||
// There is a tiny chance that we were wating on the above lock while shutdown was
|
||||
// being called and by the time we get it the db has already notified all active
|
||||
// iterators to shutdown. In this case we go to the else branch.
|
||||
if !opts.DB.ShutdownCalled {
|
||||
opts.DB.OpenIterators[iterKey] = []chan struct{}{opts.DoneChan, opts.ShutdownChan}
|
||||
opts.DB.ItMut.Unlock()
|
||||
} else {
|
||||
opts.DB.ItMut.Unlock()
|
||||
return ch
|
||||
}
|
||||
// Check if we've been told to shutdown in between getting created and getting here
|
||||
if opts.Grp != nil && interruptRequested(opts.Grp.Ch()) {
|
||||
opts.Grp.Done()
|
||||
This simplification eliminating `if opts.Grp != nil && interruptRequested(opts.Grp.Ch()) {`
This simplification eliminating `ShutdownCalled` is possible (I think) because you register the iterator with the stop group earlier. Now the registration happens in `opts.WithDB()`
|
||||
return ch
|
||||
}
|
||||
|
||||
ro := grocksdb.NewDefaultReadOptions()
|
||||
|
@ -369,11 +356,9 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
|
|||
it.Close()
|
||||
close(ch)
|
||||
ro.Destroy()
|
||||
if opts.DB != nil {
|
||||
opts.DoneChan <- struct{}{}
|
||||
opts.DB.ItMut.Lock()
|
||||
delete(opts.DB.OpenIterators, iterKey)
|
||||
opts.DB.ItMut.Unlock()
|
||||
if opts.Grp != nil {
|
||||
// opts.Grp.DoneNamed(iterKey)
|
||||
opts.Grp.Done()
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -394,7 +379,7 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
|
|||
if kv = opts.ReadRow(&prevKey); kv != nil {
|
||||
ch <- kv
|
||||
}
|
||||
if interruptRequested(opts.ShutdownChan) {
|
||||
if opts.Grp != nil && interruptRequested(opts.Grp.Ch()) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -546,7 +531,7 @@ func GetWriteDBCF(name string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHandle, er
|
|||
}
|
||||
|
||||
// GetProdDB returns a db that is used for production.
|
||||
func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func(), error) {
|
||||
func GetProdDB(name string, secondaryPath string, grp *stop.Group) (*ReadOnlyDBColumnFamily, error) {
|
||||
prefixNames := prefixes.GetPrefixes()
|
||||
// additional prefixes that aren't in the code explicitly
|
||||
cfNames := []string{"default", "e", "d", "c"}
|
||||
|
@ -555,7 +540,7 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func
|
|||
cfNames = append(cfNames, cfName)
|
||||
}
|
||||
|
||||
db, err := GetDBColumnFamilies(name, secondaryPath, cfNames)
|
||||
db, err := GetDBColumnFamilies(name, secondaryPath, cfNames, grp)
|
||||
|
||||
cleanupFiles := func() {
|
||||
err = os.RemoveAll(secondaryPath)
|
||||
|
@ -565,7 +550,8 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func
|
|||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, cleanupFiles, err
|
||||
cleanupFiles()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cleanupDB := func() {
|
||||
|
@ -574,11 +560,11 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func
|
|||
}
|
||||
db.Cleanup = cleanupDB
|
||||
|
||||
return db, cleanupDB, nil
|
||||
return db, nil
|
||||
}
|
||||
|
||||
// GetDBColumnFamilies gets a db with the specified column families and secondary path.
|
||||
func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*ReadOnlyDBColumnFamily, error) {
|
||||
func GetDBColumnFamilies(name string, secondayPath string, cfNames []string, grp *stop.Group) (*ReadOnlyDBColumnFamily, error) {
|
||||
opts := grocksdb.NewDefaultOptions()
|
||||
roOpts := grocksdb.NewDefaultReadOptions()
|
||||
cfOpt := grocksdb.NewDefaultOptions()
|
||||
|
@ -614,11 +600,7 @@ func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*R
|
|||
LastState: nil,
|
||||
Height: 0,
|
||||
Headers: nil,
|
||||
OpenIterators: make(map[string][]chan struct{}),
|
||||
ItMut: sync.RWMutex{},
|
||||
ShutdownChan: make(chan struct{}, 1),
|
||||
ShutdownCalled: false,
|
||||
DoneChan: make(chan struct{}, 1),
|
||||
Grp: grp,
|
||||
}
|
||||
|
||||
err = myDB.ReadDBState() //TODO: Figure out right place for this
|
||||
|
@ -687,24 +669,7 @@ func (db *ReadOnlyDBColumnFamily) Unwind() {
|
|||
|
||||
// Shutdown shuts down the db.
|
||||
func (db *ReadOnlyDBColumnFamily) Shutdown() {
|
||||
log.Println("Sending message to ShutdownChan...")
|
||||
db.ShutdownChan <- struct{}{}
|
||||
log.Println("Locking iterator mutex...")
|
||||
db.ItMut.Lock()
|
||||
log.Println("Setting ShutdownCalled to true...")
|
||||
db.ShutdownCalled = true
|
||||
log.Println("Notifying iterators to shutdown...")
|
||||
for _, it := range db.OpenIterators {
|
||||
it[1] <- struct{}{}
|
||||
}
|
||||
log.Println("Waiting for iterators to shutdown...")
|
||||
for _, it := range db.OpenIterators {
|
||||
<-it[0]
|
||||
}
|
||||
log.Println("Unlocking iterator mutex...")
|
||||
db.ItMut.Unlock()
|
||||
log.Println("Sending message to DoneChan...")
|
||||
<-db.DoneChan
|
||||
db.Grp.StopAndWait()
|
||||
log.Println("Calling cleanup...")
|
||||
db.Cleanup()
|
||||
log.Println("Leaving Shutdown...")
|
||||
|
@ -714,6 +679,7 @@ func (db *ReadOnlyDBColumnFamily) Shutdown() {
|
|||
// to keep the db readonly view up to date and handle reorgs on the
|
||||
// blockchain.
|
||||
func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) {
|
||||
db.Grp.Add(1)
|
||||
go func() {
|
||||
lastPrint := time.Now()
|
||||
for {
|
||||
|
@ -727,8 +693,8 @@ func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) {
|
|||
log.Infof("Error detecting changes: %#v", err)
|
||||
}
|
||||
select {
|
||||
case <-db.ShutdownChan:
|
||||
db.DoneChan <- struct{}{}
|
||||
case <-db.Grp.Ch():
|
||||
db.Grp.Done()
|
||||
return
|
||||
case <-time.After(time.Millisecond * 10):
|
||||
}
|
||||
|
|
119
db/db_test.go
119
db/db_test.go
|
@ -7,12 +7,12 @@ import (
|
|||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
dbpkg "github.com/lbryio/herald.go/db"
|
||||
"github.com/lbryio/herald.go/db/prefixes"
|
||||
"github.com/lbryio/herald.go/internal"
|
||||
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||
"github.com/linxGnu/grocksdb"
|
||||
)
|
||||
|
||||
|
@ -21,7 +21,7 @@ import (
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// OpenAndFillTmpDBColumnFamlies opens a db and fills it with data from a csv file using the given column family names
|
||||
func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFamily, [][]string, func(), error) {
|
||||
func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFamily, [][]string, error) {
|
||||
|
||||
log.Println(filePath)
|
||||
file, err := os.Open(filePath)
|
||||
|
@ -31,7 +31,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
|
|||
reader := csv.NewReader(file)
|
||||
records, err := reader.ReadAll()
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
wOpts := grocksdb.NewDefaultWriteOptions()
|
||||
|
@ -39,7 +39,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
|
|||
opts.SetCreateIfMissing(true)
|
||||
db, err := grocksdb.OpenDb(opts, "tmp")
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
var handleMap map[string]*grocksdb.ColumnFamilyHandle = make(map[string]*grocksdb.ColumnFamilyHandle)
|
||||
|
||||
|
@ -54,7 +54,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
|
|||
log.Println(cfName)
|
||||
handle, err := db.CreateColumnFamily(opts, cfName)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
handleMap[cfName] = handle
|
||||
}
|
||||
|
@ -68,16 +68,16 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
|
|||
for _, record := range records[1:] {
|
||||
cf := record[0]
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
handle := handleMap[string(cf)]
|
||||
key, err := hex.DecodeString(record[1])
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
val, err := hex.DecodeString(record[2])
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
db.PutCF(wOpts, handle, key, val)
|
||||
}
|
||||
|
@ -94,11 +94,8 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
|
|||
LastState: nil,
|
||||
Height: 0,
|
||||
Headers: nil,
|
||||
OpenIterators: make(map[string][]chan struct{}),
|
||||
ItMut: sync.RWMutex{},
|
||||
ShutdownChan: make(chan struct{}, 1),
|
||||
DoneChan: make(chan struct{}, 1),
|
||||
ShutdownCalled: false,
|
||||
Grp: stop.New(),
|
||||
Cleanup: toDefer,
|
||||
}
|
||||
|
||||
// err = dbpkg.ReadDBState(myDB) //TODO: Figure out right place for this
|
||||
|
@ -108,7 +105,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
|
|||
|
||||
err = myDB.InitTxCounts()
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// err = dbpkg.InitHeaders(myDB)
|
||||
|
@ -116,7 +113,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
|
|||
// return nil, nil, nil, err
|
||||
// }
|
||||
|
||||
return myDB, records, toDefer, nil
|
||||
return myDB, records, nil
|
||||
}
|
||||
|
||||
// OpenAndFillTmpDBCF opens a db and fills it with data from a csv file
|
||||
|
@ -247,17 +244,18 @@ func CatCSV(filePath string) {
|
|||
|
||||
func TestCatFullDB(t *testing.T) {
|
||||
t.Skip("Skipping full db test")
|
||||
grp := stop.New()
|
||||
// url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c"
|
||||
// "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1"
|
||||
// url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
|
||||
// url := "lbry://@lbry"
|
||||
// url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a"
|
||||
dbPath := "/mnt/sda/wallet_server/_data/lbry-rocksdb/"
|
||||
dbPath := "/mnt/sda1/wallet_server/_data/lbry-rocksdb/"
|
||||
// dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
|
||||
secondaryPath := "asdf"
|
||||
db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath)
|
||||
db, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp)
|
||||
defer db.Shutdown()
|
||||
|
||||
defer toDefer()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
|
@ -277,6 +275,7 @@ func TestCatFullDB(t *testing.T) {
|
|||
// TestOpenFullDB Tests running a resolve on a full db.
|
||||
func TestOpenFullDB(t *testing.T) {
|
||||
t.Skip("Skipping full db test")
|
||||
grp := stop.New()
|
||||
// url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c"
|
||||
// "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1"
|
||||
// url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
|
||||
|
@ -284,11 +283,11 @@ func TestOpenFullDB(t *testing.T) {
|
|||
// url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a"
|
||||
// url := "lbry://@lbry$1"
|
||||
url := "https://lbry.tv/@lothrop:2/lothrop-livestream-games-and-code:c"
|
||||
dbPath := "/mnt/sda/wallet_server/_data/lbry-rocksdb/"
|
||||
dbPath := "/mnt/sda1/wallet_server/_data/lbry-rocksdb/"
|
||||
// dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
|
||||
secondaryPath := "asdf"
|
||||
db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath)
|
||||
defer toDefer()
|
||||
db, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
|
@ -302,12 +301,13 @@ func TestOpenFullDB(t *testing.T) {
|
|||
func TestResolve(t *testing.T) {
|
||||
url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
|
||||
filePath := "../testdata/FULL_resolve.csv"
|
||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
Have ...Ditto all the other places calling Have `OpenAndFillTmpDBColumnFamilies()` assign `db.Cleanup = toDefer` inside to simplify this.
...Ditto all the other places calling `OpenAndFillTmpDBColumnFamilies()`.
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
defer toDefer()
|
||||
|
||||
expandedResolveResult := db.Resolve(url)
|
||||
log.Printf("%#v\n", expandedResolveResult)
|
||||
if expandedResolveResult != nil && expandedResolveResult.Channel != nil {
|
||||
|
@ -321,11 +321,11 @@ func TestResolve(t *testing.T) {
|
|||
func TestGetDBState(t *testing.T) {
|
||||
filePath := "../testdata/s_resolve.csv"
|
||||
want := uint32(1072108)
|
||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
defer toDefer()
|
||||
state, err := db.GetDBState()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
|
@ -343,11 +343,11 @@ func TestGetRepostedClaim(t *testing.T) {
|
|||
// Should be non-existent
|
||||
channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf")
|
||||
filePath := "../testdata/W_resolve.csv"
|
||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
defer toDefer()
|
||||
|
||||
count, err := db.GetRepostedCount(channelHash)
|
||||
if err != nil {
|
||||
|
@ -376,11 +376,11 @@ func TestGetRepostedCount(t *testing.T) {
|
|||
// Should be non-existent
|
||||
channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf")
|
||||
filePath := "../testdata/j_resolve.csv"
|
||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
defer toDefer()
|
||||
|
||||
count, err := db.GetRepostedCount(channelHash)
|
||||
if err != nil {
|
||||
|
@ -413,11 +413,11 @@ func TestGetRepost(t *testing.T) {
|
|||
channelHash2, _ := hex.DecodeString("000009ca6e0caaaef16872b4bd4f6f1b8c2363e2")
|
||||
filePath := "../testdata/V_resolve.csv"
|
||||
// want := uint32(3670)
|
||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
defer toDefer()
|
||||
|
||||
res, err := db.GetRepost(channelHash)
|
||||
if err != nil {
|
||||
|
@ -447,11 +447,12 @@ func TestGetClaimsInChannelCount(t *testing.T) {
|
|||
channelHash, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bd")
|
||||
filePath := "../testdata/Z_resolve.csv"
|
||||
want := uint32(3670)
|
||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
defer toDefer()
|
||||
|
||||
count, err := db.GetClaimsInChannelCount(channelHash)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
|
@ -476,11 +477,12 @@ func TestGetShortClaimIdUrl(t *testing.T) {
|
|||
var position uint16 = 0
|
||||
filePath := "../testdata/F_resolve.csv"
|
||||
log.Println(filePath)
|
||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
defer toDefer()
|
||||
|
||||
shortUrl, err := db.GetShortClaimIdUrl(name, normalName, claimHash, rootTxNum, position)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
|
@ -494,11 +496,11 @@ func TestClaimShortIdIter(t *testing.T) {
|
|||
filePath := "../testdata/F_cat.csv"
|
||||
normalName := "cat"
|
||||
claimId := "0"
|
||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
defer toDefer()
|
||||
|
||||
ch := db.ClaimShortIdIter(normalName, claimId)
|
||||
|
||||
|
@ -524,11 +526,12 @@ func TestGetTXOToClaim(t *testing.T) {
|
|||
var txNum uint32 = 1456296
|
||||
var position uint16 = 0
|
||||
filePath := "../testdata/G_2.csv"
|
||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
defer toDefer()
|
||||
|
||||
val, err := db.GetCachedClaimHash(txNum, position)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
|
@ -552,11 +555,11 @@ func TestGetClaimToChannel(t *testing.T) {
|
|||
var val []byte = nil
|
||||
|
||||
filePath := "../testdata/I_resolve.csv"
|
||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
defer toDefer()
|
||||
|
||||
val, err = db.GetChannelForClaim(claimHash, txNum, position)
|
||||
if err != nil {
|
||||
|
@ -581,11 +584,11 @@ func TestGetEffectiveAmountSupportOnly(t *testing.T) {
|
|||
want := uint64(20000006)
|
||||
claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a"
|
||||
claimHash, _ := hex.DecodeString(claimHashStr)
|
||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
defer toDefer()
|
||||
db.Height = 999999999
|
||||
|
||||
amount, err := db.GetEffectiveAmount(claimHash, true)
|
||||
|
@ -611,11 +614,12 @@ func TestGetEffectiveAmount(t *testing.T) {
|
|||
want := uint64(21000006)
|
||||
claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a"
|
||||
claimHash, _ := hex.DecodeString(claimHashStr)
|
||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
defer toDefer()
|
||||
|
||||
db.Height = 999999999
|
||||
|
||||
amount, err := db.GetEffectiveAmount(claimHash, false)
|
||||
|
@ -648,11 +652,12 @@ func TestGetSupportAmount(t *testing.T) {
|
|||
t.Error(err)
|
||||
}
|
||||
filePath := "../testdata/a_resolve.csv"
|
||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
defer toDefer()
|
||||
|
||||
res, err := db.GetSupportAmount(claimHash)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
|
@ -668,11 +673,12 @@ func TestGetTxHash(t *testing.T) {
|
|||
want := "54e14ff0c404c29b3d39ae4d249435f167d5cd4ce5a428ecb745b3df1c8e3dde"
|
||||
|
||||
filePath := "../testdata/X_resolve.csv"
|
||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
defer toDefer()
|
||||
|
||||
resHash, err := db.GetTxHash(txNum)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
|
@ -710,11 +716,12 @@ func TestGetActivation(t *testing.T) {
|
|||
txNum := uint32(0x6284e3)
|
||||
position := uint16(0x0)
|
||||
want := uint32(0xa6b65)
|
||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
defer toDefer()
|
||||
|
||||
activation, err := db.GetActivation(txNum, position)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
|
@ -741,12 +748,13 @@ func TestGetClaimToTXO(t *testing.T) {
|
|||
return
|
||||
}
|
||||
filePath := "../testdata/E_resolve.csv"
|
||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
defer toDefer()
|
||||
|
||||
res, err := db.GetCachedClaimTxo(claimHash, true)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
|
@ -770,12 +778,13 @@ func TestGetControllingClaim(t *testing.T) {
|
|||
claimName := internal.NormalizeName("@Styxhexenhammer666")
|
||||
claimHash := "2556ed1cab9d17f2a9392030a9ad7f5d138f11bd"
|
||||
filePath := "../testdata/P_resolve.csv"
|
||||
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
defer toDefer()
|
||||
|
||||
res, err := db.GetControllingClaim(claimName)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"bytes"
|
||||
|
||||
"github.com/lbryio/herald.go/db/prefixes"
|
||||
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||
"github.com/linxGnu/grocksdb"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
@ -22,12 +23,11 @@ type IterOptions struct {
|
|||
IncludeValue bool
|
||||
RawKey bool
|
||||
RawValue bool
|
||||
ShutdownChan chan struct{}
|
||||
DoneChan chan struct{}
|
||||
DB *ReadOnlyDBColumnFamily
|
||||
CfHandle *grocksdb.ColumnFamilyHandle
|
||||
It *grocksdb.Iterator
|
||||
Serializer *prefixes.SerializationAPI
|
||||
Grp *stop.Group
|
||||
// DB *ReadOnlyDBColumnFamily
|
||||
CfHandle *grocksdb.ColumnFamilyHandle
|
||||
It *grocksdb.Iterator
|
||||
Serializer *prefixes.SerializationAPI
|
||||
}
|
||||
|
||||
// NewIterateOptions creates a defualt options structure for a db iterator.
|
||||
|
@ -43,12 +43,11 @@ func NewIterateOptions() *IterOptions {
|
|||
IncludeValue: false,
|
||||
RawKey: false,
|
||||
RawValue: false,
|
||||
ShutdownChan: make(chan struct{}, 1),
|
||||
DoneChan: make(chan struct{}, 1),
|
||||
DB: nil,
|
||||
CfHandle: nil,
|
||||
It: nil,
|
||||
Serializer: prefixes.ProductionAPI,
|
||||
Grp: nil,
|
||||
// DB: nil,
|
||||
CfHandle: nil,
|
||||
It: nil,
|
||||
Serializer: prefixes.ProductionAPI,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -108,7 +107,9 @@ func (o *IterOptions) WithRawValue(rawValue bool) *IterOptions {
|
|||
}
|
||||
|
||||
func (o *IterOptions) WithDB(db *ReadOnlyDBColumnFamily) *IterOptions {
|
||||
o.DB = db
|
||||
// o.Grp.AddNamed(1, iterKey)
|
||||
o.Grp = stop.New(db.Grp)
|
||||
o.Grp.Add(1)
|
||||
return o
|
||||
}
|
||||
|
||||
|
|
5
go.mod
5
go.mod
|
@ -24,9 +24,6 @@ require (
|
|||
gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c
|
||||
)
|
||||
|
||||
require (
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
|
||||
|
@ -43,7 +40,7 @@ require (
|
|||
github.com/prometheus/procfs v0.6.0 // indirect
|
||||
github.com/stretchr/testify v1.7.0 // indirect
|
||||
golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b // indirect
|
||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
|
||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
|
||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
|
||||
google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84 // indirect
|
||||
|
|
4
go.sum
4
go.sum
|
@ -358,8 +358,6 @@ github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL
|
|||
github.com/lbryio/lbcd v0.22.100-beta/go.mod h1:u8SaFX4xdGMMR5xasBGfgApC8pvD4rnK2OujZnrq5gs=
|
||||
github.com/lbryio/lbcd v0.22.100-beta-rc5/go.mod h1:9PbFSlHYX7WlnDQwcTxHVf1W35VAnRsattCSyKOO55g=
|
||||
github.com/lbryio/lbcd v0.22.200-beta/go.mod h1:kNuzGWf808ipTGB0y0WogzsGv5BVM4Qv85Z+JYwC9FA=
|
||||
github.com/lbryio/lbcd v0.22.201-beta-rc1 h1:FmzzApVj2RBXloLM2w9tLvN2xyTZjeyh+QC7GIw/wwo=
|
||||
github.com/lbryio/lbcd v0.22.201-beta-rc1/go.mod h1:kNuzGWf808ipTGB0y0WogzsGv5BVM4Qv85Z+JYwC9FA=
|
||||
github.com/lbryio/lbcd v0.22.201-beta-rc4 h1:Xh751Bh/GWRcP5bI6NJ2+zueo2otTcTWapFvFbryP5c=
|
||||
github.com/lbryio/lbcd v0.22.201-beta-rc4/go.mod h1:Jgo48JDINhdOgHHR83J70Q6G42x3WAo9DI//QogcL+E=
|
||||
github.com/lbryio/lbcutil v1.0.201/go.mod h1:gDHc/b+Rdz3J7+VB8e5/Bl9roVf8Q5/8FQCyuK9dXD0=
|
||||
|
@ -375,8 +373,6 @@ github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-b
|
|||
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
|
||||
github.com/linxGnu/grocksdb v1.6.42 h1:nJLoXFuzwBwQQQrXTUgRGRz1QRm7y8pR6CNV/gwrbqs=
|
||||
github.com/linxGnu/grocksdb v1.6.42/go.mod h1:JcMMDBFaDNhRXFYcYXmgQwb/RarSld1PulTI7UzE+w0=
|
||||
github.com/linxGnu/grocksdb v1.7.0 h1:UyFDykX0CUfxDN10cqlFho/rwt9K6KoDaLXL9Ej5z9g=
|
||||
github.com/linxGnu/grocksdb v1.7.0/go.mod h1:JcMMDBFaDNhRXFYcYXmgQwb/RarSld1PulTI7UzE+w0=
|
||||
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
|
||||
github.com/lyoshenka/bencode v0.0.0-20180323155644-b7abd7672df5/go.mod h1:H0aPCWffGOaDcjkw1iB7W9DVLp6GXmfcJY/7YZCWPA4=
|
||||
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
|
||||
|
|
12
main.go
12
main.go
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/lbryio/herald.go/internal"
|
||||
pb "github.com/lbryio/herald.go/protobuf/go"
|
||||
"github.com/lbryio/herald.go/server"
|
||||
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
@ -27,13 +28,16 @@ func main() {
|
|||
|
||||
if args.CmdType == server.ServeCmd {
|
||||
// This will cancel goroutines with the server finishes.
|
||||
ctxWCancel, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
// ctxWCancel, cancel := context.WithCancel(ctx)
|
||||
// defer cancel()
|
||||
stopGroup := stop.New()
|
||||
// defer stopGroup.Stop()
|
||||
|
||||
initsignals()
|
||||
initsignals(stopGroup.Ch())
|
||||
interrupt := interruptListener()
|
||||
|
||||
s := server.MakeHubServer(ctxWCancel, args)
|
||||
// s := server.MakeHubServer(ctxWCancel, args)
|
||||
s := server.MakeHubServer(stopGroup, args)
|
||||
go s.Run()
|
||||
|
||||
defer func() {
|
||||
|
|
|
@ -88,7 +88,7 @@ test_command_with_want
|
|||
|
||||
### blockchain.block.get_chunk
|
||||
read -r -d '' CMD <<- EOM
|
||||
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
|
||||
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||
--data '{"id": 1, "method": "blockchain.block.get_chunk", "params": [0]}'
|
||||
| jq .result | sed 's/"//g' | head -c 100
|
||||
EOM
|
||||
|
@ -97,7 +97,7 @@ test_command_with_want
|
|||
|
||||
### blockchain.block.get_header
|
||||
read -r -d '' CMD <<- EOM
|
||||
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
|
||||
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||
--data '{"id": 1, "method": "blockchain.block.get_header", "params": []}'
|
||||
| jq .result.timestamp
|
||||
EOM
|
||||
|
@ -106,7 +106,7 @@ test_command_with_want
|
|||
|
||||
### blockchain.block.headers
|
||||
read -r -d '' CMD <<- EOM
|
||||
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
|
||||
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||
--data '{"id": 1, "method": "blockchain.block.headers", "params": []}'
|
||||
| jq .result.count
|
||||
EOM
|
||||
|
@ -116,7 +116,7 @@ test_command_with_want
|
|||
## blockchain.claimtrie
|
||||
|
||||
read -r -d '' CMD <<- EOM
|
||||
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
|
||||
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||
--data '{"id": 1, "method": "blockchain.claimtrie.resolve", "params":[{"Data": ["@Styxhexenhammer666:2"]}]}'
|
||||
| jq .result.txos[0].tx_hash | sed 's/"//g'
|
||||
EOM
|
||||
|
@ -128,7 +128,7 @@ test_command_with_want
|
|||
### blockchain.address.get_balance
|
||||
|
||||
read -r -d '' CMD <<- EOM
|
||||
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
|
||||
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||
--data '{"id": 1, "method": "blockchain.address.get_balance", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
|
||||
| jq .result.confirmed
|
||||
EOM
|
||||
|
@ -138,7 +138,7 @@ test_command_with_want
|
|||
## blockchain.address.get_history
|
||||
|
||||
read -r -d '' CMD <<- EOM
|
||||
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
|
||||
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||
--data '{"id": 1, "method": "blockchain.address.get_history", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
|
||||
| jq '.result.confirmed | length'
|
||||
EOM
|
||||
|
@ -148,7 +148,7 @@ test_command_with_want
|
|||
## blockchain.address.listunspent
|
||||
|
||||
read -r -d '' CMD <<- EOM
|
||||
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
|
||||
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||
--data '{"id": 1, "method": "blockchain.address.listunspent", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
|
||||
| jq '.result | length'
|
||||
EOM
|
||||
|
@ -160,7 +160,7 @@ test_command_with_want
|
|||
## blockchain.scripthash.get_mempool
|
||||
|
||||
read -r -d '' CMD <<- EOM
|
||||
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
|
||||
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||
--data '{"id": 1, "method": "blockchain.scripthash.get_mempool", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
|
||||
| jq .error | sed 's/"//g'
|
||||
EOM
|
||||
|
@ -170,7 +170,7 @@ test_command_with_want
|
|||
## blockchain.scripthash.get_history
|
||||
|
||||
read -r -d '' CMD <<- EOM
|
||||
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
|
||||
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||
--data '{"id": 1, "method": "blockchain.scripthash.get_history", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
|
||||
| jq .error | sed 's/"//g'
|
||||
EOM
|
||||
|
@ -180,13 +180,42 @@ test_command_with_want
|
|||
## blockchain.scripthash.listunspent
|
||||
|
||||
read -r -d '' CMD <<- EOM
|
||||
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
|
||||
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||
--data '{"id": 1, "method": "blockchain.scripthash.listunspent", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
|
||||
| jq .error | sed 's/"//g'
|
||||
EOM
|
||||
WANT="encoding/hex: invalid byte: U+0047 'G'"
|
||||
test_command_with_want
|
||||
|
||||
## server.banner
|
||||
|
||||
read -r -d '' CMD <<- EOM
|
||||
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||
--data '{"id": 1, "method": "server.banner", "params":[]}'
|
||||
| jq .result | sed 's/"//g'
|
||||
EOM
|
||||
WANT="You are connected to an 0.107.0 server."
|
||||
test_command_with_want
|
||||
|
||||
## server.version
|
||||
|
||||
read -r -d '' CMD <<- EOM
|
||||
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||
--data '{"id": 1, "method": "server.version", "params":[]}'
|
||||
| jq .result | sed 's/"//g'
|
||||
EOM
|
||||
WANT="0.107.0"
|
||||
test_command_with_want
|
||||
|
||||
## server.features
|
||||
|
||||
read -r -d '' CMD <<- EOM
|
||||
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
|
||||
--data '{"id": 1, "method": "server.features", "params":[]}'
|
||||
EOM
|
||||
WANT='{"result":{"hosts":{},"pruning":"","server_version":"0.107.0","protocol_min":"0.54.0","protocol_max":"0.199.0","genesis_hash":"9c89283ba0f3227f6c03b70216b9f665f0118d5e0fa729cedf4fb34d6a34f463","description":"Herald","payment_address":"","donation_address":"","daily_fee":"1.0","hash_function":"sha256","trending_algorithm":"fast_ar"},"error":null,"id":1}'
|
||||
test_command_with_want
|
||||
|
||||
# metrics endpoint testing
|
||||
|
||||
WANT=0
|
||||
|
|
229
server/args.go
229
server/args.go
|
@ -1,6 +1,7 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"strconv"
|
||||
|
@ -19,26 +20,37 @@ const (
|
|||
|
||||
// Args struct contains the arguments to the hub server.
|
||||
type Args struct {
|
||||
CmdType int
|
||||
Host string
|
||||
Port string
|
||||
DBPath string
|
||||
Chain *string
|
||||
EsHost string
|
||||
EsPort string
|
||||
PrometheusPort string
|
||||
NotifierPort string
|
||||
JSONRPCPort int
|
||||
JSONRPCHTTPPort int
|
||||
MaxSessions int
|
||||
SessionTimeout int
|
||||
EsIndex string
|
||||
RefreshDelta int
|
||||
CacheTTL int
|
||||
PeerFile string
|
||||
Country string
|
||||
BlockingChannelIds []string
|
||||
FilteringChannelIds []string
|
||||
CmdType int
|
||||
Host string
|
||||
Port string
|
||||
DBPath string
|
||||
Chain *string
|
||||
EsHost string
|
||||
EsPort string
|
||||
PrometheusPort string
|
||||
NotifierPort string
|
||||
JSONRPCPort int
|
||||
JSONRPCHTTPPort int
|
||||
MaxSessions int
|
||||
SessionTimeout int
|
||||
EsIndex string
|
||||
RefreshDelta int
|
||||
CacheTTL int
|
||||
PeerFile string
|
||||
Banner *string
|
||||
Country string
|
||||
BlockingChannelIds []string
|
||||
FilteringChannelIds []string
|
||||
|
||||
GenesisHash string
|
||||
ServerVersion string
|
||||
ProtocolMin string
|
||||
ProtocolMax string
|
||||
ServerDescription string
|
||||
PaymentAddress string
|
||||
DonationAddress string
|
||||
DailyFee string
|
||||
|
||||
Debug bool
|
||||
DisableEs bool
|
||||
DisableLoadPeers bool
|
||||
|
@ -54,21 +66,32 @@ type Args struct {
|
|||
}
|
||||
|
||||
const (
|
||||
DefaultHost = "0.0.0.0"
|
||||
DefaultPort = "50051"
|
||||
DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME
|
||||
DefaultEsHost = "http://localhost"
|
||||
DefaultEsIndex = "claims"
|
||||
DefaultEsPort = "9200"
|
||||
DefaultPrometheusPort = "2112"
|
||||
DefaultNotifierPort = "18080"
|
||||
DefaultJSONRPCPort = 50001
|
||||
DefaultMaxSessions = 10000
|
||||
DefaultSessionTimeout = 300
|
||||
DefaultRefreshDelta = 5
|
||||
DefaultCacheTTL = 5
|
||||
DefaultPeerFile = "peers.txt"
|
||||
DefaultCountry = "US"
|
||||
DefaultHost = "0.0.0.0"
|
||||
DefaultPort = "50051"
|
||||
DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME
|
||||
DefaultEsHost = "http://localhost"
|
||||
DefaultEsIndex = "claims"
|
||||
DefaultEsPort = "9200"
|
||||
DefaultPrometheusPort = "2112"
|
||||
DefaultNotifierPort = "18080"
|
||||
DefaultJSONRPCPort = 50001
|
||||
DefaultJSONRPCHTTPPort = 50002
|
||||
DefaultMaxSessions = 10000
|
||||
DefaultSessionTimeout = 300
|
||||
DefaultRefreshDelta = 5
|
||||
DefaultCacheTTL = 5
|
||||
DefaultPeerFile = "peers.txt"
|
||||
DefaultBannerFile = ""
|
||||
DefaultCountry = "US"
|
||||
|
||||
HUB_PROTOCOL_VERSION = "0.107.0"
|
||||
PROTOCOL_MIN = "0.54.0"
|
||||
PROTOCOL_MAX = "0.199.0"
|
||||
DefaultServerDescription = "Herald"
|
||||
DefaultPaymentAddress = ""
|
||||
DefaultDonationAddress = ""
|
||||
DefaultDailyFee = "1.0"
|
||||
|
||||
DefaultDisableLoadPeers = false
|
||||
DefaultDisableStartPrometheus = false
|
||||
DefaultDisableStartUDP = false
|
||||
|
@ -86,6 +109,73 @@ var (
|
|||
DefaultFilteringChannelIds = []string{}
|
||||
)
|
||||
|
||||
func loadBanner(bannerFile *string, serverVersion string) *string {
|
||||
var banner string
|
||||
|
||||
data, err := os.ReadFile(*bannerFile)
|
||||
if err != nil {
|
||||
banner = fmt.Sprintf("You are connected to an %s server.", serverVersion)
|
||||
} else {
|
||||
banner = string(data)
|
||||
}
|
||||
|
||||
/*
|
||||
banner := os.Getenv("BANNER")
|
||||
if banner == "" {
|
||||
return nil
|
||||
}
|
||||
*/
|
||||
|
||||
return &banner
|
||||
}
|
||||
|
||||
// MakeDefaultArgs creates a default set of arguments for testing the server.
|
||||
func MakeDefaultTestArgs() *Args {
|
||||
args := &Args{
|
||||
CmdType: ServeCmd,
|
||||
Host: DefaultHost,
|
||||
Port: DefaultPort,
|
||||
DBPath: DefaultDBPath,
|
||||
EsHost: DefaultEsHost,
|
||||
EsPort: DefaultEsPort,
|
||||
PrometheusPort: DefaultPrometheusPort,
|
||||
NotifierPort: DefaultNotifierPort,
|
||||
JSONRPCPort: DefaultJSONRPCPort,
|
||||
JSONRPCHTTPPort: DefaultJSONRPCHTTPPort,
|
||||
MaxSessions: DefaultMaxSessions,
|
||||
SessionTimeout: DefaultSessionTimeout,
|
||||
EsIndex: DefaultEsIndex,
|
||||
RefreshDelta: DefaultRefreshDelta,
|
||||
CacheTTL: DefaultCacheTTL,
|
||||
PeerFile: DefaultPeerFile,
|
||||
Banner: nil,
|
||||
Country: DefaultCountry,
|
||||
|
||||
GenesisHash: chaincfg.TestNet3Params.GenesisHash.String(),
|
||||
ServerVersion: HUB_PROTOCOL_VERSION,
|
||||
ProtocolMin: PROTOCOL_MIN,
|
||||
ProtocolMax: PROTOCOL_MAX,
|
||||
ServerDescription: DefaultServerDescription,
|
||||
PaymentAddress: DefaultPaymentAddress,
|
||||
DonationAddress: DefaultDonationAddress,
|
||||
DailyFee: DefaultDailyFee,
|
||||
|
||||
DisableEs: true,
|
||||
Debug: true,
|
||||
DisableLoadPeers: true,
|
||||
DisableStartPrometheus: true,
|
||||
DisableStartUDP: true,
|
||||
DisableWritePeers: true,
|
||||
DisableRocksDBRefresh: true,
|
||||
DisableResolve: true,
|
||||
DisableBlockingAndFiltering: true,
|
||||
DisableStartNotifier: true,
|
||||
DisableStartJSONRPC: true,
|
||||
}
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
// GetEnvironment takes the environment variables as an array of strings
|
||||
// and a getkeyval function to turn it into a map.
|
||||
func GetEnvironment(data []string, getkeyval func(item string) (key, val string)) map[string]string {
|
||||
|
@ -111,7 +201,7 @@ func GetEnvironmentStandard() map[string]string {
|
|||
func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
||||
|
||||
environment := GetEnvironmentStandard()
|
||||
parser := argparse.NewParser("hub", "hub server and client")
|
||||
parser := argparse.NewParser("herald", "herald server and client")
|
||||
|
||||
serveCmd := parser.NewCommand("serve", "start the hub server")
|
||||
searchCmd := parser.NewCommand("search", "claim search")
|
||||
|
@ -122,6 +212,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
|||
return err
|
||||
}
|
||||
|
||||
// main server config arguments
|
||||
host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost})
|
||||
port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort})
|
||||
dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath})
|
||||
|
@ -131,18 +222,26 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
|||
esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort})
|
||||
prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort})
|
||||
notifierPort := parser.String("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort})
|
||||
jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort})
|
||||
jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort})
|
||||
jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort, Default: DefaultJSONRPCPort})
|
||||
jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort, Default: DefaultJSONRPCHTTPPort})
|
||||
maxSessions := parser.Int("", "max-sessions", &argparse.Options{Required: false, Help: "Maximum number of electrum clients that can be connected", Default: DefaultMaxSessions})
|
||||
sessionTimeout := parser.Int("", "session-timeout", &argparse.Options{Required: false, Help: "Session inactivity timeout (seconds)", Default: DefaultSessionTimeout})
|
||||
esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: DefaultEsIndex})
|
||||
refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: DefaultRefreshDelta})
|
||||
cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: DefaultCacheTTL})
|
||||
peerFile := parser.String("", "peerfile", &argparse.Options{Required: false, Help: "Initial peer file for federation", Default: DefaultPeerFile})
|
||||
bannerFile := parser.String("", "bannerfile", &argparse.Options{Required: false, Help: "Banner file server.banner", Default: DefaultBannerFile})
|
||||
country := parser.String("", "country", &argparse.Options{Required: false, Help: "Country this node is running in. Default US.", Default: DefaultCountry})
|
||||
blockingChannelIds := parser.StringList("", "blocking-channel-ids", &argparse.Options{Required: false, Help: "Blocking channel ids", Default: DefaultBlockingChannelIds})
|
||||
filteringChannelIds := parser.StringList("", "filtering-channel-ids", &argparse.Options{Required: false, Help: "Filtering channel ids", Default: DefaultFilteringChannelIds})
|
||||
|
||||
// arguments for server features
|
||||
serverDescription := parser.String("", "server-description", &argparse.Options{Required: false, Help: "Server description", Default: DefaultServerDescription})
|
||||
paymentAddress := parser.String("", "payment-address", &argparse.Options{Required: false, Help: "Payment address", Default: DefaultPaymentAddress})
|
||||
donationAddress := parser.String("", "donation-address", &argparse.Options{Required: false, Help: "Donation address", Default: DefaultDonationAddress})
|
||||
dailyFee := parser.String("", "daily-fee", &argparse.Options{Required: false, Help: "Daily fee", Default: DefaultDailyFee})
|
||||
|
||||
// flags for disabling features
|
||||
debug := parser.Flag("", "debug", &argparse.Options{Required: false, Help: "enable debug logging", Default: false})
|
||||
disableEs := parser.Flag("", "disable-es", &argparse.Options{Required: false, Help: "Disable elastic search, for running/testing independently", Default: false})
|
||||
disableLoadPeers := parser.Flag("", "disable-load-peers", &argparse.Options{Required: false, Help: "Disable load peers from disk at startup", Default: DefaultDisableLoadPeers})
|
||||
|
@ -156,6 +255,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
|||
disableStartNotifier := parser.Flag("", "disable-start-notifier", &argparse.Options{Required: false, Help: "Disable start notifier", Default: DisableStartNotifier})
|
||||
disableStartJSONRPC := parser.Flag("", "disable-start-jsonrpc", &argparse.Options{Required: false, Help: "Disable start jsonrpc endpoint", Default: DisableStartJSONRPC})
|
||||
|
||||
// search command arguments
|
||||
text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"})
|
||||
name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"})
|
||||
claimType := parser.String("", "claim_type", &argparse.Options{Required: false, Help: "claim_type"})
|
||||
|
@ -177,27 +277,40 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
|||
*jsonRPCPort = DefaultJSONRPCPort
|
||||
}
|
||||
|
||||
banner := loadBanner(bannerFile, HUB_PROTOCOL_VERSION)
|
||||
|
||||
args := &Args{
|
||||
CmdType: SearchCmd,
|
||||
Host: *host,
|
||||
Port: *port,
|
||||
DBPath: *dbPath,
|
||||
Chain: chain,
|
||||
EsHost: *esHost,
|
||||
EsPort: *esPort,
|
||||
PrometheusPort: *prometheusPort,
|
||||
NotifierPort: *notifierPort,
|
||||
JSONRPCPort: *jsonRPCPort,
|
||||
JSONRPCHTTPPort: *jsonRPCHTTPPort,
|
||||
MaxSessions: *maxSessions,
|
||||
SessionTimeout: *sessionTimeout,
|
||||
EsIndex: *esIndex,
|
||||
RefreshDelta: *refreshDelta,
|
||||
CacheTTL: *cacheTTL,
|
||||
PeerFile: *peerFile,
|
||||
Country: *country,
|
||||
BlockingChannelIds: *blockingChannelIds,
|
||||
FilteringChannelIds: *filteringChannelIds,
|
||||
CmdType: SearchCmd,
|
||||
Host: *host,
|
||||
Port: *port,
|
||||
DBPath: *dbPath,
|
||||
Chain: chain,
|
||||
EsHost: *esHost,
|
||||
EsPort: *esPort,
|
||||
PrometheusPort: *prometheusPort,
|
||||
NotifierPort: *notifierPort,
|
||||
JSONRPCPort: *jsonRPCPort,
|
||||
JSONRPCHTTPPort: *jsonRPCHTTPPort,
|
||||
MaxSessions: *maxSessions,
|
||||
SessionTimeout: *sessionTimeout,
|
||||
EsIndex: *esIndex,
|
||||
RefreshDelta: *refreshDelta,
|
||||
CacheTTL: *cacheTTL,
|
||||
PeerFile: *peerFile,
|
||||
Banner: banner,
|
||||
Country: *country,
|
||||
BlockingChannelIds: *blockingChannelIds,
|
||||
FilteringChannelIds: *filteringChannelIds,
|
||||
|
||||
GenesisHash: "",
|
||||
ServerVersion: HUB_PROTOCOL_VERSION,
|
||||
ProtocolMin: PROTOCOL_MIN,
|
||||
ProtocolMax: PROTOCOL_MAX,
|
||||
ServerDescription: *serverDescription,
|
||||
PaymentAddress: *paymentAddress,
|
||||
DonationAddress: *donationAddress,
|
||||
DailyFee: *dailyFee,
|
||||
|
||||
Debug: *debug,
|
||||
DisableEs: *disableEs,
|
||||
DisableLoadPeers: *disableLoadPeers,
|
||||
|
|
|
@ -12,7 +12,8 @@ import (
|
|||
|
||||
"github.com/lbryio/herald.go/internal/metrics"
|
||||
pb "github.com/lbryio/herald.go/protobuf/go"
|
||||
server "github.com/lbryio/herald.go/server"
|
||||
"github.com/lbryio/herald.go/server"
|
||||
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
@ -44,43 +45,11 @@ func removeFile(fileName string) {
|
|||
}
|
||||
}
|
||||
|
||||
// makeDefaultArgs creates a default set of arguments for testing the server.
|
||||
func makeDefaultArgs() *server.Args {
|
||||
args := &server.Args{
|
||||
CmdType: server.ServeCmd,
|
||||
Host: server.DefaultHost,
|
||||
Port: server.DefaultPort,
|
||||
DBPath: server.DefaultDBPath,
|
||||
EsHost: server.DefaultEsHost,
|
||||
EsPort: server.DefaultEsPort,
|
||||
PrometheusPort: server.DefaultPrometheusPort,
|
||||
NotifierPort: server.DefaultNotifierPort,
|
||||
JSONRPCPort: server.DefaultJSONRPCPort,
|
||||
EsIndex: server.DefaultEsIndex,
|
||||
RefreshDelta: server.DefaultRefreshDelta,
|
||||
CacheTTL: server.DefaultCacheTTL,
|
||||
PeerFile: server.DefaultPeerFile,
|
||||
Country: server.DefaultCountry,
|
||||
DisableEs: true,
|
||||
Debug: true,
|
||||
DisableLoadPeers: true,
|
||||
DisableStartPrometheus: true,
|
||||
DisableStartUDP: true,
|
||||
DisableWritePeers: true,
|
||||
DisableRocksDBRefresh: true,
|
||||
DisableResolve: true,
|
||||
DisableBlockingAndFiltering: true,
|
||||
DisableStartNotifier: true,
|
||||
DisableStartJSONRPC: true,
|
||||
}
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
// TestAddPeer tests the ability to add peers
|
||||
func TestAddPeer(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
args := makeDefaultArgs()
|
||||
// ctx := context.Background()
|
||||
ctx := stop.NewDebug()
|
||||
args := server.MakeDefaultTestArgs()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
|
@ -137,8 +106,9 @@ func TestAddPeer(t *testing.T) {
|
|||
|
||||
// TestPeerWriter tests that peers get written properly
|
||||
func TestPeerWriter(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
args := makeDefaultArgs()
|
||||
// ctx := context.Background()
|
||||
ctx := stop.NewDebug()
|
||||
args := server.MakeDefaultTestArgs()
|
||||
args.DisableWritePeers = false
|
||||
|
||||
tests := []struct {
|
||||
|
@ -193,9 +163,10 @@ func TestPeerWriter(t *testing.T) {
|
|||
|
||||
// TestAddPeerEndpoint tests the ability to add peers
|
||||
func TestAddPeerEndpoint(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
args := makeDefaultArgs()
|
||||
args2 := makeDefaultArgs()
|
||||
// ctx := context.Background()
|
||||
ctx := stop.NewDebug()
|
||||
args := server.MakeDefaultTestArgs()
|
||||
args2 := server.MakeDefaultTestArgs()
|
||||
args2.Port = "50052"
|
||||
|
||||
tests := []struct {
|
||||
|
@ -264,10 +235,11 @@ func TestAddPeerEndpoint(t *testing.T) {
|
|||
|
||||
// TestAddPeerEndpoint2 tests the ability to add peers
|
||||
func TestAddPeerEndpoint2(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
args := makeDefaultArgs()
|
||||
args2 := makeDefaultArgs()
|
||||
args3 := makeDefaultArgs()
|
||||
// ctx := context.Background()
|
||||
ctx := stop.NewDebug()
|
||||
args := server.MakeDefaultTestArgs()
|
||||
args2 := server.MakeDefaultTestArgs()
|
||||
args3 := server.MakeDefaultTestArgs()
|
||||
args2.Port = "50052"
|
||||
args3.Port = "50053"
|
||||
|
||||
|
@ -345,10 +317,11 @@ func TestAddPeerEndpoint2(t *testing.T) {
|
|||
|
||||
// TestAddPeerEndpoint3 tests the ability to add peers
|
||||
func TestAddPeerEndpoint3(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
args := makeDefaultArgs()
|
||||
args2 := makeDefaultArgs()
|
||||
args3 := makeDefaultArgs()
|
||||
// ctx := context.Background()
|
||||
ctx := stop.NewDebug()
|
||||
args := server.MakeDefaultTestArgs()
|
||||
args2 := server.MakeDefaultTestArgs()
|
||||
args3 := server.MakeDefaultTestArgs()
|
||||
args2.Port = "50052"
|
||||
args3.Port = "50053"
|
||||
|
||||
|
@ -434,10 +407,11 @@ func TestAddPeerEndpoint3(t *testing.T) {
|
|||
|
||||
// TestAddPeer tests the ability to add peers
|
||||
func TestUDPServer(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
args := makeDefaultArgs()
|
||||
// ctx := context.Background()
|
||||
ctx := stop.NewDebug()
|
||||
args := server.MakeDefaultTestArgs()
|
||||
args.DisableStartUDP = false
|
||||
args2 := makeDefaultArgs()
|
||||
args2 := server.MakeDefaultTestArgs()
|
||||
args2.Port = "50052"
|
||||
args2.DisableStartUDP = false
|
||||
|
||||
|
|
|
@ -105,6 +105,7 @@ func newBlockHeaderElectrum(header *[HEADER_SIZE]byte, height uint32) *BlockHead
|
|||
type BlockGetServerHeightReq struct{}
|
||||
type BlockGetServerHeightResp uint32
|
||||
|
||||
// blockchain.block.get_server_height
|
||||
func (s *BlockchainBlockService) Get_server_height(req *BlockGetServerHeightReq, resp **BlockGetServerHeightResp) error {
|
||||
if s.DB == nil || s.DB.LastState == nil {
|
||||
return fmt.Errorf("unknown height")
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/lbryio/lbcd/chaincfg"
|
||||
"github.com/lbryio/lbcd/txscript"
|
||||
"github.com/lbryio/lbcutil"
|
||||
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||
)
|
||||
|
||||
// Source: test_variety_of_transactions_and_longish_history (lbry-sdk/tests/integration/transactions)
|
||||
|
@ -57,8 +58,9 @@ var regTestAddrs = [30]string{
|
|||
|
||||
func TestServerGetHeight(t *testing.T) {
|
||||
secondaryPath := "asdf"
|
||||
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
|
||||
defer toDefer()
|
||||
grp := stop.NewDebug()
|
||||
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
|
@ -87,8 +89,9 @@ func TestServerGetHeight(t *testing.T) {
|
|||
|
||||
func TestGetChunk(t *testing.T) {
|
||||
secondaryPath := "asdf"
|
||||
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
|
||||
defer toDefer()
|
||||
grp := stop.NewDebug()
|
||||
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
|
@ -130,8 +133,9 @@ func TestGetChunk(t *testing.T) {
|
|||
|
||||
func TestGetHeader(t *testing.T) {
|
||||
secondaryPath := "asdf"
|
||||
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
|
||||
defer toDefer()
|
||||
grp := stop.NewDebug()
|
||||
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
|
@ -159,8 +163,9 @@ func TestGetHeader(t *testing.T) {
|
|||
|
||||
func TestHeaders(t *testing.T) {
|
||||
secondaryPath := "asdf"
|
||||
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
|
||||
defer toDefer()
|
||||
grp := stop.NewDebug()
|
||||
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
|
@ -189,15 +194,17 @@ func TestHeaders(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestHeadersSubscribe(t *testing.T) {
|
||||
args := MakeDefaultTestArgs()
|
||||
grp := stop.NewDebug()
|
||||
secondaryPath := "asdf"
|
||||
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
|
||||
defer toDefer()
|
||||
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
sm := newSessionManager(db, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout)
|
||||
sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams)
|
||||
sm.start()
|
||||
defer sm.stop()
|
||||
|
||||
|
@ -209,13 +216,13 @@ func TestHeadersSubscribe(t *testing.T) {
|
|||
// Set up logic to read a notification.
|
||||
var received sync.WaitGroup
|
||||
recv := func(client net.Conn) {
|
||||
defer received.Done()
|
||||
buf := make([]byte, 1024)
|
||||
len, err := client.Read(buf)
|
||||
if err != nil {
|
||||
t.Errorf("read err: %v", err)
|
||||
}
|
||||
t.Logf("len: %v notification: %v", len, string(buf))
|
||||
received.Done()
|
||||
}
|
||||
received.Add(2)
|
||||
go recv(client1)
|
||||
|
@ -281,8 +288,9 @@ func TestHeadersSubscribe(t *testing.T) {
|
|||
|
||||
func TestGetBalance(t *testing.T) {
|
||||
secondaryPath := "asdf"
|
||||
`defer db.Shutdown()` to match style elsewhere.
|
||||
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
|
||||
defer toDefer()
|
||||
grp := stop.NewDebug()
|
||||
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
|
@ -310,8 +318,9 @@ func TestGetBalance(t *testing.T) {
|
|||
|
||||
func TestGetHistory(t *testing.T) {
|
||||
secondaryPath := "asdf"
|
||||
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
|
||||
defer toDefer()
|
||||
grp := stop.NewDebug()
|
||||
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
|
@ -339,8 +348,9 @@ func TestGetHistory(t *testing.T) {
|
|||
|
||||
func TestListUnspent(t *testing.T) {
|
||||
secondaryPath := "asdf"
|
||||
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
|
||||
defer toDefer()
|
||||
grp := stop.NewDebug()
|
||||
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
|
@ -367,15 +377,17 @@ func TestListUnspent(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestAddressSubscribe(t *testing.T) {
|
||||
args := MakeDefaultTestArgs()
|
||||
grp := stop.NewDebug()
|
||||
secondaryPath := "asdf"
|
||||
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
|
||||
defer toDefer()
|
||||
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
|
||||
defer db.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
sm := newSessionManager(db, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout)
|
||||
sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams)
|
||||
sm.start()
|
||||
defer sm.stop()
|
||||
|
||||
|
|
89
server/jsonrpc_server.go
Normal file
89
server/jsonrpc_server.go
Normal file
|
@ -0,0 +1,89 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type ServerService struct {
|
||||
Args *Args
|
||||
}
|
||||
|
||||
type ServerFeatureService struct {
|
||||
Args *Args
|
||||
}
|
||||
|
||||
type ServerFeaturesReq struct{}
|
||||
|
||||
type ServerFeaturesRes struct {
|
||||
Hosts map[string]string `json:"hosts"`
|
||||
Pruning string `json:"pruning"`
|
||||
ServerVersion string `json:"server_version"`
|
||||
ProtocolMin string `json:"protocol_min"`
|
||||
ProtocolMax string `json:"protocol_max"`
|
||||
GenesisHash string `json:"genesis_hash"`
|
||||
Description string `json:"description"`
|
||||
PaymentAddress string `json:"payment_address"`
|
||||
DonationAddress string `json:"donation_address"`
|
||||
DailyFee string `json:"daily_fee"`
|
||||
HashFunction string `json:"hash_function"`
|
||||
TrendingAlgorithm string `json:"trending_algorithm"`
|
||||
}
|
||||
|
||||
// Features is the json rpc endpoint for 'server.features'.
|
||||
func (t *ServerService) Features(req *ServerFeaturesReq, res **ServerFeaturesRes) error {
|
||||
log.Println("Features")
|
||||
|
||||
features := &ServerFeaturesRes{
|
||||
Hosts: map[string]string{},
|
||||
Pruning: "",
|
||||
ServerVersion: HUB_PROTOCOL_VERSION,
|
||||
ProtocolMin: PROTOCOL_MIN,
|
||||
ProtocolMax: PROTOCOL_MAX,
|
||||
GenesisHash: t.Args.GenesisHash,
|
||||
Description: t.Args.ServerDescription,
|
||||
PaymentAddress: t.Args.PaymentAddress,
|
||||
DonationAddress: t.Args.DonationAddress,
|
||||
DailyFee: t.Args.DailyFee,
|
||||
HashFunction: "sha256",
|
||||
TrendingAlgorithm: "fast_ar",
|
||||
}
|
||||
*res = features
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type ServerBannerService struct {
|
||||
Args *Args
|
||||
}
|
||||
|
||||
type ServerBannerReq struct{}
|
||||
|
||||
type ServerBannerRes string
|
||||
|
||||
// Banner is the json rpc endpoint for 'server.banner'.
|
||||
func (t *ServerService) Banner(req *ServerBannerReq, res **ServerBannerRes) error {
|
||||
log.Println("Banner")
|
||||
|
||||
*res = (*ServerBannerRes)(t.Args.Banner)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type ServerVersionService struct {
|
||||
Args *Args
|
||||
}
|
||||
|
||||
type ServerVersionReq struct{}
|
||||
|
||||
type ServerVersionRes string
|
||||
|
||||
// Banner is the json rpc endpoint for 'server.version'.
|
||||
// FIXME: This should return a struct with the version and the protocol version.
|
||||
// <<-- that comment was written by github, scary shit because it's true
|
||||
func (t *ServerService) Version(req *ServerVersionReq, res **ServerVersionRes) error {
|
||||
log.Println("Version")
|
||||
|
||||
*res = (*ServerVersionRes)(&t.Args.ServerVersion)
|
||||
|
||||
return nil
|
||||
}
|
|
@ -121,6 +121,14 @@ fail1:
|
|||
goto fail2
|
||||
}
|
||||
|
||||
// Register "server.{features,banner,version}" handlers.
|
||||
serverSvc := &ServerService{s.Args}
|
||||
err = s1.RegisterTCPService(serverSvc, "server")
|
||||
if err != nil {
|
||||
log.Errorf("RegisterTCPService: %v\n", err)
|
||||
goto fail2
|
||||
}
|
||||
|
||||
r := gorilla_mux.NewRouter()
|
||||
r.Handle("/rpc", s1)
|
||||
port := ":" + strconv.FormatUint(uint64(s.Args.JSONRPCHTTPPort), 10)
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package server_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"net"
|
||||
|
@ -10,6 +9,7 @@ import (
|
|||
|
||||
"github.com/lbryio/herald.go/internal"
|
||||
"github.com/lbryio/herald.go/server"
|
||||
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
|
@ -47,8 +47,9 @@ func tcpRead(conn net.Conn) ([]byte, error) {
|
|||
}
|
||||
|
||||
func TestNotifierServer(t *testing.T) {
|
||||
args := makeDefaultArgs()
|
||||
ctx := context.Background()
|
||||
args := server.MakeDefaultTestArgs()
|
||||
// ctx := context.Background()
|
||||
ctx := stop.NewDebug()
|
||||
hub := server.MakeHubServer(ctx, args)
|
||||
|
||||
go hub.NotifierServer()
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
pb "github.com/lbryio/herald.go/protobuf/go"
|
||||
server "github.com/lbryio/herald.go/server"
|
||||
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||
"github.com/olivere/elastic/v7"
|
||||
)
|
||||
|
||||
|
@ -55,13 +56,14 @@ func TestSearch(t *testing.T) {
|
|||
w.Write([]byte(resp))
|
||||
}
|
||||
|
||||
context := context.Background()
|
||||
args := makeDefaultArgs()
|
||||
hubServer := server.MakeHubServer(context, args)
|
||||
ctx := context.Background()
|
||||
stopGroup := stop.NewDebug()
|
||||
args := server.MakeDefaultTestArgs()
|
||||
hubServer := server.MakeHubServer(stopGroup, args)
|
||||
req := &pb.SearchRequest{
|
||||
Text: "asdf",
|
||||
}
|
||||
out, err := hubServer.Search(context, req)
|
||||
out, err := hubServer.Search(ctx, req)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"github.com/lbryio/herald.go/meta"
|
||||
pb "github.com/lbryio/herald.go/protobuf/go"
|
||||
"github.com/lbryio/lbcd/chaincfg"
|
||||
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||
"github.com/olivere/elastic/v7"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
|
@ -53,6 +54,7 @@ type Server struct {
|
|||
HeightSubs map[net.Addr]net.Conn
|
||||
HeightSubsMut sync.RWMutex
|
||||
NotifierChan chan interface{}
|
||||
Grp *stop.Group
|
||||
sessionManager *sessionManager
|
||||
pb.UnimplementedHubServer
|
||||
}
|
||||
|
@ -149,7 +151,7 @@ func (s *Server) Run() {
|
|||
}
|
||||
}
|
||||
|
||||
func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) {
|
||||
func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, error) {
|
||||
tmpName, err := ioutil.TempDir("", "go-lbry-hub")
|
||||
if err != nil {
|
||||
logrus.Info(err)
|
||||
|
@ -159,10 +161,7 @@ func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) {
|
|||
if err != nil {
|
||||
logrus.Info(err)
|
||||
}
|
||||
myDB, _, err := db.GetProdDB(args.DBPath, tmpName)
|
||||
// dbShutdown = func() {
|
||||
// db.Shutdown(myDB)
|
||||
// }
|
||||
myDB, err := db.GetProdDB(args.DBPath, tmpName, grp)
|
||||
if err != nil {
|
||||
// Can't load the db, fail loudly
|
||||
logrus.Info(err)
|
||||
|
@ -217,7 +216,7 @@ func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) {
|
|||
// MakeHubServer takes the arguments given to a hub when it's started and
|
||||
// initializes everything. It loads information about previously known peers,
|
||||
// creates needed internal data structures, and initializes goroutines.
|
||||
func MakeHubServer(ctx context.Context, args *Args) *Server {
|
||||
func MakeHubServer(grp *stop.Group, args *Args) *Server {
|
||||
grpcServer := grpc.NewServer(grpc.NumStreamWorkers(0))
|
||||
|
||||
multiSpaceRe, err := regexp.Compile(`\s{2,}`)
|
||||
|
@ -266,14 +265,14 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
|
|||
|
||||
//TODO: is this the right place to load the db?
|
||||
var myDB *db.ReadOnlyDBColumnFamily
|
||||
// var dbShutdown = func() {}
|
||||
if !args.DisableResolve {
|
||||
myDB, err = LoadDatabase(args)
|
||||
myDB, err = LoadDatabase(args, grp)
|
||||
Not needed. Not needed.
|
||||
if err != nil {
|
||||
logrus.Warning(err)
|
||||
}
|
||||
}
|
||||
Ahaa. So this is how it gets set. You might consider passing Ahaa. So this is how it gets set. You might consider passing `grp` down into `LoadDatabase()`. So it gets initialized ASAP. Also, don't forget the `ReadonlyDBColumnFamilies` instance constructed in db_test.go.
|
||||
|
||||
// Determine which chain to use based on db and cli values
|
||||
dbChain := (*chaincfg.Params)(nil)
|
||||
if myDB != nil && myDB.LastState != nil && myDB.LastState.Genesis != nil {
|
||||
// The chain params can be inferred from DBStateValue.
|
||||
|
@ -310,6 +309,10 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
|
|||
}
|
||||
logrus.Infof("network: %v", chain.Name)
|
||||
|
||||
args.GenesisHash = chain.GenesisHash.String()
|
||||
|
||||
sessionGrp := stop.New(grp)
|
||||
|
||||
s := &Server{
|
||||
GrpcServer: grpcServer,
|
||||
Args: args,
|
||||
|
@ -333,7 +336,8 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
|
|||
HeightSubs: make(map[net.Addr]net.Conn),
|
||||
HeightSubsMut: sync.RWMutex{},
|
||||
NotifierChan: make(chan interface{}),
|
||||
sessionManager: newSessionManager(myDB, &chain, args.MaxSessions, args.SessionTimeout),
|
||||
Grp: grp,
|
||||
sessionManager: newSessionManager(myDB, args, sessionGrp, &chain),
|
||||
}
|
||||
|
||||
// Start up our background services
|
||||
|
|
|
@ -1,5 +1,11 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"github.com/lbryio/herald.go/db"
|
||||
"github.com/lbryio/lbcd/chaincfg"
|
||||
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||
)
|
||||
|
||||
func (s *Server) AddPeerExported() func(*Peer, bool, bool) error {
|
||||
return s.addPeer
|
||||
}
|
||||
|
@ -7,3 +13,7 @@ func (s *Server) AddPeerExported() func(*Peer, bool, bool) error {
|
|||
func (s *Server) GetNumPeersExported() func() int64 {
|
||||
return s.getNumPeers
|
||||
}
|
||||
|
||||
func NewSessionManagerExported(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
|
||||
return newSessionManager(db, args, grp, chain)
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/lbryio/herald.go/db"
|
||||
"github.com/lbryio/herald.go/internal"
|
||||
"github.com/lbryio/lbcd/chaincfg"
|
||||
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
|
@ -114,13 +115,15 @@ type sessionMap map[uintptr]*session
|
|||
|
||||
type sessionManager struct {
|
||||
// sessionsMut protects sessions, headerSubs, hashXSubs state
|
||||
sessionsMut sync.RWMutex
|
||||
sessions sessionMap
|
||||
sessionsWait sync.WaitGroup
|
||||
sessionsMut sync.RWMutex
|
||||
sessions sessionMap
|
||||
// sessionsWait sync.WaitGroup
|
||||
grp *stop.Group
|
||||
sessionsMax int
|
||||
sessionTimeout time.Duration
|
||||
manageTicker *time.Ticker
|
||||
db *db.ReadOnlyDBColumnFamily
|
||||
args *Args
|
||||
chain *chaincfg.Params
|
||||
// headerSubs are sessions subscribed via 'blockchain.headers.subscribe'
|
||||
headerSubs sessionMap
|
||||
|
@ -128,13 +131,15 @@ type sessionManager struct {
|
|||
hashXSubs map[[HASHX_LEN]byte]sessionMap
|
||||
}
|
||||
|
||||
func newSessionManager(db *db.ReadOnlyDBColumnFamily, chain *chaincfg.Params, sessionsMax, sessionTimeout int) *sessionManager {
|
||||
func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
|
||||
return &sessionManager{
|
||||
sessions: make(sessionMap),
|
||||
sessionsMax: sessionsMax,
|
||||
sessionTimeout: time.Duration(sessionTimeout) * time.Second,
|
||||
manageTicker: time.NewTicker(time.Duration(max(5, sessionTimeout/20)) * time.Second),
|
||||
grp: grp,
|
||||
sessionsMax: args.MaxSessions,
|
||||
sessionTimeout: time.Duration(args.SessionTimeout) * time.Second,
|
||||
manageTicker: time.NewTicker(time.Duration(max(5, args.SessionTimeout/20)) * time.Second),
|
||||
db: db,
|
||||
args: args,
|
||||
chain: chain,
|
||||
headerSubs: make(sessionMap),
|
||||
hashXSubs: make(map[[HASHX_LEN]byte]sessionMap),
|
||||
|
@ -142,6 +147,7 @@ func newSessionManager(db *db.ReadOnlyDBColumnFamily, chain *chaincfg.Params, se
|
|||
}
|
||||
|
||||
func (sm *sessionManager) start() {
|
||||
sm.grp.Add(1)
|
||||
go sm.manage()
|
||||
}
|
||||
|
||||
|
@ -168,7 +174,13 @@ func (sm *sessionManager) manage() {
|
|||
}
|
||||
sm.sessionsMut.Unlock()
|
||||
// Wait for next management clock tick.
|
||||
<-sm.manageTicker.C
|
||||
select {
|
||||
case <-sm.grp.Ch():
|
||||
sm.grp.Done()
|
||||
return
|
||||
case <-sm.manageTicker.C:
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -190,11 +202,18 @@ func (sm *sessionManager) addSession(conn net.Conn) *session {
|
|||
// each request and update subscriptions.
|
||||
s1 := rpc.NewServer()
|
||||
|
||||
// Register "server.{features,banner,version}" handlers.
|
||||
serverSvc := &ServerService{sm.args}
|
||||
err := s1.RegisterName("server", serverSvc)
|
||||
if err != nil {
|
||||
log.Errorf("RegisterName: %v\n", err)
|
||||
}
|
||||
|
||||
// Register "blockchain.claimtrie.*"" handlers.
|
||||
claimtrieSvc := &ClaimtrieService{sm.db}
|
||||
err := s1.RegisterName("blockchain.claimtrie", claimtrieSvc)
|
||||
err = s1.RegisterName("blockchain.claimtrie", claimtrieSvc)
|
||||
if err != nil {
|
||||
log.Errorf("RegisterService: %v\n", err)
|
||||
log.Errorf("RegisterName: %v\n", err)
|
||||
}
|
||||
|
||||
// Register other "blockchain.{block,address,scripthash}.*" handlers.
|
||||
|
@ -220,11 +239,11 @@ func (sm *sessionManager) addSession(conn net.Conn) *session {
|
|||
goto fail
|
||||
}
|
||||
|
||||
sm.sessionsWait.Add(1)
|
||||
sm.grp.Add(1)
|
||||
go func() {
|
||||
s1.ServeCodec(&SessionServerCodec{jsonrpc.NewServerCodec(conn), sess})
|
||||
log.Infof("session %v goroutine exit", sess.addr.String())
|
||||
sm.sessionsWait.Done()
|
||||
sm.grp.Done()
|
||||
}()
|
||||
return sess
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
|
||||
// TestUDPPing tests UDPPing correctness against prod server.
|
||||
func TestUDPPing(t *testing.T) {
|
||||
args := makeDefaultArgs()
|
||||
args := server.MakeDefaultTestArgs()
|
||||
args.DisableStartUDP = true
|
||||
|
||||
tests := []struct {
|
||||
|
|
|
@ -8,12 +8,13 @@ import (
|
|||
"os"
|
||||
"os/signal"
|
||||
|
||||
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// shutdownRequestChannel is used to initiate shutdown from one of the
|
||||
// subsystems using the same code paths as when an interrupt signal is received.
|
||||
var shutdownRequestChannel = make(chan struct{})
|
||||
var shutdownRequestChannel = make(stop.Chan)
|
||||
|
||||
// interruptSignals defines the default signals to catch in order to do a proper
|
||||
// shutdown. This may be modified during init depending on the platform.
|
||||
|
|
|
@ -10,9 +10,12 @@ package main
|
|||
import (
|
||||
"os"
|
||||
"syscall"
|
||||
|
||||
"github.com/lbryio/lbry.go/v3/extras/stop"
|
||||
)
|
||||
|
||||
// initsignals sets the signals to be caught by the signal handler
|
||||
func initsignals() {
|
||||
func initsignals(stopCh stop.Chan) {
|
||||
shutdownRequestChannel = stopCh
|
||||
interruptSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue
OpenIterator
s,ItMut
not needed anymore.