Server endpoints goroutine refactor #69

Merged
jeffreypicard merged 18 commits from server-endpoints-goroutine-refactor into master 2022-10-25 07:48:13 +02:00
21 changed files with 546 additions and 307 deletions

View file

@ -8,7 +8,6 @@ import (
"encoding/hex"
"fmt"
"os"
"sync"
"time"
"github.com/lbryio/herald.go/db/prefixes"
@ -16,6 +15,7 @@ import (
"github.com/lbryio/herald.go/internal"
"github.com/lbryio/herald.go/internal/metrics"
pb "github.com/lbryio/herald.go/protobuf/go"
"github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/linxGnu/grocksdb"
log "github.com/sirupsen/logrus"
@ -59,11 +59,7 @@ type ReadOnlyDBColumnFamily struct {
BlockedChannels map[string][]byte
FilteredStreams map[string][]byte
FilteredChannels map[string][]byte
moodyjon commented 2022-10-14 16:14:51 +02:00 (Migrated from github.com)
Review

OpenIterators, ItMut not needed anymore.

`OpenIterator`s, `ItMut` not needed anymore.
OpenIterators map[string][]chan struct{}
ItMut sync.RWMutex
ShutdownChan chan struct{}
DoneChan chan struct{}
ShutdownCalled bool
Grp *stop.Group
Cleanup func()
}
@ -339,20 +335,11 @@ func interruptRequested(interrupted <-chan struct{}) bool {
func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
ch := make(chan *prefixes.PrefixRowKV)
iterKey := fmt.Sprintf("%p", opts)
if opts.DB != nil {
opts.DB.ItMut.Lock()
// There is a tiny chance that we were wating on the above lock while shutdown was
// being called and by the time we get it the db has already notified all active
// iterators to shutdown. In this case we go to the else branch.
if !opts.DB.ShutdownCalled {
opts.DB.OpenIterators[iterKey] = []chan struct{}{opts.DoneChan, opts.ShutdownChan}
opts.DB.ItMut.Unlock()
} else {
opts.DB.ItMut.Unlock()
// Check if we've been told to shutdown in between getting created and getting here
if opts.Grp != nil && interruptRequested(opts.Grp.Ch()) {
opts.Grp.Done()
moodyjon commented 2022-10-14 16:02:16 +02:00 (Migrated from github.com)
Review

if opts.Grp != nil && interruptRequested(opts.Grp.Ch()) {

This simplification eliminating ShutdownCalled is possible (I think) because you register the iterator with the stop group earlier. Now the registration happens in opts.WithDB()

`if opts.Grp != nil && interruptRequested(opts.Grp.Ch()) {` This simplification eliminating `ShutdownCalled` is possible (I think) because you register the iterator with the stop group earlier. Now the registration happens in `opts.WithDB()`
return ch
}
}
ro := grocksdb.NewDefaultReadOptions()
ro.SetFillCache(opts.FillCache)
@ -369,11 +356,9 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
it.Close()
close(ch)
ro.Destroy()
if opts.DB != nil {
opts.DoneChan <- struct{}{}
opts.DB.ItMut.Lock()
delete(opts.DB.OpenIterators, iterKey)
opts.DB.ItMut.Unlock()
if opts.Grp != nil {
// opts.Grp.DoneNamed(iterKey)
opts.Grp.Done()
}
}()
@ -394,7 +379,7 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
if kv = opts.ReadRow(&prevKey); kv != nil {
ch <- kv
}
if interruptRequested(opts.ShutdownChan) {
if opts.Grp != nil && interruptRequested(opts.Grp.Ch()) {
return
}
}
@ -546,7 +531,7 @@ func GetWriteDBCF(name string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHandle, er
}
// GetProdDB returns a db that is used for production.
func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func(), error) {
func GetProdDB(name string, secondaryPath string, grp *stop.Group) (*ReadOnlyDBColumnFamily, error) {
prefixNames := prefixes.GetPrefixes()
// additional prefixes that aren't in the code explicitly
cfNames := []string{"default", "e", "d", "c"}
@ -555,7 +540,7 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func
cfNames = append(cfNames, cfName)
}
db, err := GetDBColumnFamilies(name, secondaryPath, cfNames)
db, err := GetDBColumnFamilies(name, secondaryPath, cfNames, grp)
cleanupFiles := func() {
err = os.RemoveAll(secondaryPath)
@ -565,7 +550,8 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func
}
if err != nil {
return nil, cleanupFiles, err
cleanupFiles()
return nil, err
}
cleanupDB := func() {
@ -574,11 +560,11 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func
}
db.Cleanup = cleanupDB
return db, cleanupDB, nil
return db, nil
}
// GetDBColumnFamilies gets a db with the specified column families and secondary path.
func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*ReadOnlyDBColumnFamily, error) {
func GetDBColumnFamilies(name string, secondayPath string, cfNames []string, grp *stop.Group) (*ReadOnlyDBColumnFamily, error) {
opts := grocksdb.NewDefaultOptions()
roOpts := grocksdb.NewDefaultReadOptions()
cfOpt := grocksdb.NewDefaultOptions()
@ -614,11 +600,7 @@ func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*R
LastState: nil,
Height: 0,
Headers: nil,
OpenIterators: make(map[string][]chan struct{}),
ItMut: sync.RWMutex{},
ShutdownChan: make(chan struct{}, 1),
ShutdownCalled: false,
DoneChan: make(chan struct{}, 1),
Grp: grp,
}
err = myDB.ReadDBState() //TODO: Figure out right place for this
@ -687,24 +669,7 @@ func (db *ReadOnlyDBColumnFamily) Unwind() {
// Shutdown shuts down the db.
func (db *ReadOnlyDBColumnFamily) Shutdown() {
log.Println("Sending message to ShutdownChan...")
db.ShutdownChan <- struct{}{}
log.Println("Locking iterator mutex...")
db.ItMut.Lock()
log.Println("Setting ShutdownCalled to true...")
db.ShutdownCalled = true
log.Println("Notifying iterators to shutdown...")
for _, it := range db.OpenIterators {
it[1] <- struct{}{}
}
log.Println("Waiting for iterators to shutdown...")
for _, it := range db.OpenIterators {
<-it[0]
}
log.Println("Unlocking iterator mutex...")
db.ItMut.Unlock()
log.Println("Sending message to DoneChan...")
<-db.DoneChan
db.Grp.StopAndWait()
log.Println("Calling cleanup...")
db.Cleanup()
log.Println("Leaving Shutdown...")
@ -714,6 +679,7 @@ func (db *ReadOnlyDBColumnFamily) Shutdown() {
// to keep the db readonly view up to date and handle reorgs on the
// blockchain.
func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) {
db.Grp.Add(1)
go func() {
lastPrint := time.Now()
for {
@ -727,8 +693,8 @@ func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) {
log.Infof("Error detecting changes: %#v", err)
}
select {
case <-db.ShutdownChan:
db.DoneChan <- struct{}{}
case <-db.Grp.Ch():
db.Grp.Done()
return
case <-time.After(time.Millisecond * 10):
}

View file

@ -7,12 +7,12 @@ import (
"log"
"os"
"strings"
"sync"
"testing"
dbpkg "github.com/lbryio/herald.go/db"
"github.com/lbryio/herald.go/db/prefixes"
"github.com/lbryio/herald.go/internal"
"github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/linxGnu/grocksdb"
)
@ -21,7 +21,7 @@ import (
////////////////////////////////////////////////////////////////////////////////
// OpenAndFillTmpDBColumnFamlies opens a db and fills it with data from a csv file using the given column family names
func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFamily, [][]string, func(), error) {
func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFamily, [][]string, error) {
log.Println(filePath)
file, err := os.Open(filePath)
@ -31,7 +31,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
reader := csv.NewReader(file)
records, err := reader.ReadAll()
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
wOpts := grocksdb.NewDefaultWriteOptions()
@ -39,7 +39,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
opts.SetCreateIfMissing(true)
db, err := grocksdb.OpenDb(opts, "tmp")
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
var handleMap map[string]*grocksdb.ColumnFamilyHandle = make(map[string]*grocksdb.ColumnFamilyHandle)
@ -54,7 +54,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
log.Println(cfName)
handle, err := db.CreateColumnFamily(opts, cfName)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
handleMap[cfName] = handle
}
@ -68,16 +68,16 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
for _, record := range records[1:] {
cf := record[0]
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
handle := handleMap[string(cf)]
key, err := hex.DecodeString(record[1])
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
val, err := hex.DecodeString(record[2])
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
db.PutCF(wOpts, handle, key, val)
}
@ -94,11 +94,8 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
LastState: nil,
Height: 0,
Headers: nil,
OpenIterators: make(map[string][]chan struct{}),
ItMut: sync.RWMutex{},
ShutdownChan: make(chan struct{}, 1),
DoneChan: make(chan struct{}, 1),
ShutdownCalled: false,
Grp: stop.New(),
Cleanup: toDefer,
}
// err = dbpkg.ReadDBState(myDB) //TODO: Figure out right place for this
@ -108,7 +105,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
err = myDB.InitTxCounts()
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
// err = dbpkg.InitHeaders(myDB)
@ -116,7 +113,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
// return nil, nil, nil, err
// }
return myDB, records, toDefer, nil
return myDB, records, nil
}
// OpenAndFillTmpDBCF opens a db and fills it with data from a csv file
@ -247,17 +244,18 @@ func CatCSV(filePath string) {
func TestCatFullDB(t *testing.T) {
t.Skip("Skipping full db test")
grp := stop.New()
// url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c"
// "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1"
// url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
// url := "lbry://@lbry"
// url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a"
dbPath := "/mnt/sda/wallet_server/_data/lbry-rocksdb/"
dbPath := "/mnt/sda1/wallet_server/_data/lbry-rocksdb/"
// dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
secondaryPath := "asdf"
db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath)
db, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp)
defer db.Shutdown()
defer toDefer()
if err != nil {
t.Error(err)
return
@ -277,6 +275,7 @@ func TestCatFullDB(t *testing.T) {
// TestOpenFullDB Tests running a resolve on a full db.
func TestOpenFullDB(t *testing.T) {
t.Skip("Skipping full db test")
grp := stop.New()
// url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c"
// "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1"
// url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
@ -284,11 +283,11 @@ func TestOpenFullDB(t *testing.T) {
// url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a"
// url := "lbry://@lbry$1"
url := "https://lbry.tv/@lothrop:2/lothrop-livestream-games-and-code:c"
dbPath := "/mnt/sda/wallet_server/_data/lbry-rocksdb/"
dbPath := "/mnt/sda1/wallet_server/_data/lbry-rocksdb/"
// dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
secondaryPath := "asdf"
db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath)
defer toDefer()
db, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -302,12 +301,13 @@ func TestOpenFullDB(t *testing.T) {
func TestResolve(t *testing.T) {
url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
filePath := "../testdata/FULL_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
moodyjon commented 2022-10-19 16:18:32 +02:00 (Migrated from github.com)
Review

Have OpenAndFillTmpDBColumnFamilies() assign db.Cleanup = toDefer inside to simplify this.

...Ditto all the other places calling OpenAndFillTmpDBColumnFamilies().

Have `OpenAndFillTmpDBColumnFamilies()` assign `db.Cleanup = toDefer` inside to simplify this. ...Ditto all the other places calling `OpenAndFillTmpDBColumnFamilies()`.
defer db.Shutdown()
if err != nil {
t.Error(err)
return
}
defer toDefer()
expandedResolveResult := db.Resolve(url)
log.Printf("%#v\n", expandedResolveResult)
if expandedResolveResult != nil && expandedResolveResult.Channel != nil {
@ -321,11 +321,11 @@ func TestResolve(t *testing.T) {
func TestGetDBState(t *testing.T) {
filePath := "../testdata/s_resolve.csv"
want := uint32(1072108)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
state, err := db.GetDBState()
if err != nil {
t.Error(err)
@ -343,11 +343,11 @@ func TestGetRepostedClaim(t *testing.T) {
// Should be non-existent
channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf")
filePath := "../testdata/W_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
count, err := db.GetRepostedCount(channelHash)
if err != nil {
@ -376,11 +376,11 @@ func TestGetRepostedCount(t *testing.T) {
// Should be non-existent
channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf")
filePath := "../testdata/j_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
count, err := db.GetRepostedCount(channelHash)
if err != nil {
@ -413,11 +413,11 @@ func TestGetRepost(t *testing.T) {
channelHash2, _ := hex.DecodeString("000009ca6e0caaaef16872b4bd4f6f1b8c2363e2")
filePath := "../testdata/V_resolve.csv"
// want := uint32(3670)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
res, err := db.GetRepost(channelHash)
if err != nil {
@ -447,11 +447,12 @@ func TestGetClaimsInChannelCount(t *testing.T) {
channelHash, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bd")
filePath := "../testdata/Z_resolve.csv"
want := uint32(3670)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
count, err := db.GetClaimsInChannelCount(channelHash)
if err != nil {
t.Error(err)
@ -476,11 +477,12 @@ func TestGetShortClaimIdUrl(t *testing.T) {
var position uint16 = 0
filePath := "../testdata/F_resolve.csv"
log.Println(filePath)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
shortUrl, err := db.GetShortClaimIdUrl(name, normalName, claimHash, rootTxNum, position)
if err != nil {
t.Error(err)
@ -494,11 +496,11 @@ func TestClaimShortIdIter(t *testing.T) {
filePath := "../testdata/F_cat.csv"
normalName := "cat"
claimId := "0"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
ch := db.ClaimShortIdIter(normalName, claimId)
@ -524,11 +526,12 @@ func TestGetTXOToClaim(t *testing.T) {
var txNum uint32 = 1456296
var position uint16 = 0
filePath := "../testdata/G_2.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
val, err := db.GetCachedClaimHash(txNum, position)
if err != nil {
t.Error(err)
@ -552,11 +555,11 @@ func TestGetClaimToChannel(t *testing.T) {
var val []byte = nil
filePath := "../testdata/I_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
val, err = db.GetChannelForClaim(claimHash, txNum, position)
if err != nil {
@ -581,11 +584,11 @@ func TestGetEffectiveAmountSupportOnly(t *testing.T) {
want := uint64(20000006)
claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a"
claimHash, _ := hex.DecodeString(claimHashStr)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
db.Height = 999999999
amount, err := db.GetEffectiveAmount(claimHash, true)
@ -611,11 +614,12 @@ func TestGetEffectiveAmount(t *testing.T) {
want := uint64(21000006)
claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a"
claimHash, _ := hex.DecodeString(claimHashStr)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
db.Height = 999999999
amount, err := db.GetEffectiveAmount(claimHash, false)
@ -648,11 +652,12 @@ func TestGetSupportAmount(t *testing.T) {
t.Error(err)
}
filePath := "../testdata/a_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
res, err := db.GetSupportAmount(claimHash)
if err != nil {
t.Error(err)
@ -668,11 +673,12 @@ func TestGetTxHash(t *testing.T) {
want := "54e14ff0c404c29b3d39ae4d249435f167d5cd4ce5a428ecb745b3df1c8e3dde"
filePath := "../testdata/X_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
resHash, err := db.GetTxHash(txNum)
if err != nil {
t.Error(err)
@ -710,11 +716,12 @@ func TestGetActivation(t *testing.T) {
txNum := uint32(0x6284e3)
position := uint16(0x0)
want := uint32(0xa6b65)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
activation, err := db.GetActivation(txNum, position)
if err != nil {
t.Error(err)
@ -741,12 +748,13 @@ func TestGetClaimToTXO(t *testing.T) {
return
}
filePath := "../testdata/E_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
}
defer toDefer()
res, err := db.GetCachedClaimTxo(claimHash, true)
if err != nil {
t.Error(err)
@ -770,12 +778,13 @@ func TestGetControllingClaim(t *testing.T) {
claimName := internal.NormalizeName("@Styxhexenhammer666")
claimHash := "2556ed1cab9d17f2a9392030a9ad7f5d138f11bd"
filePath := "../testdata/P_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
}
defer toDefer()
res, err := db.GetControllingClaim(claimName)
if err != nil {
t.Error(err)

View file

@ -6,6 +6,7 @@ import (
"bytes"
"github.com/lbryio/herald.go/db/prefixes"
"github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/linxGnu/grocksdb"
log "github.com/sirupsen/logrus"
@ -22,9 +23,8 @@ type IterOptions struct {
IncludeValue bool
RawKey bool
RawValue bool
ShutdownChan chan struct{}
DoneChan chan struct{}
DB *ReadOnlyDBColumnFamily
Grp *stop.Group
// DB *ReadOnlyDBColumnFamily
CfHandle *grocksdb.ColumnFamilyHandle
It *grocksdb.Iterator
Serializer *prefixes.SerializationAPI
@ -43,9 +43,8 @@ func NewIterateOptions() *IterOptions {
IncludeValue: false,
RawKey: false,
RawValue: false,
ShutdownChan: make(chan struct{}, 1),
DoneChan: make(chan struct{}, 1),
DB: nil,
Grp: nil,
// DB: nil,
CfHandle: nil,
It: nil,
Serializer: prefixes.ProductionAPI,
@ -108,7 +107,9 @@ func (o *IterOptions) WithRawValue(rawValue bool) *IterOptions {
}
func (o *IterOptions) WithDB(db *ReadOnlyDBColumnFamily) *IterOptions {
o.DB = db
// o.Grp.AddNamed(1, iterKey)
o.Grp = stop.New(db.Grp)
o.Grp.Add(1)
return o
}

5
go.mod
View file

@ -24,9 +24,6 @@ require (
gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c
)
require (
)
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
@ -43,7 +40,7 @@ require (
github.com/prometheus/procfs v0.6.0 // indirect
github.com/stretchr/testify v1.7.0 // indirect
golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b // indirect
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84 // indirect

4
go.sum
View file

@ -358,8 +358,6 @@ github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL
github.com/lbryio/lbcd v0.22.100-beta/go.mod h1:u8SaFX4xdGMMR5xasBGfgApC8pvD4rnK2OujZnrq5gs=
github.com/lbryio/lbcd v0.22.100-beta-rc5/go.mod h1:9PbFSlHYX7WlnDQwcTxHVf1W35VAnRsattCSyKOO55g=
github.com/lbryio/lbcd v0.22.200-beta/go.mod h1:kNuzGWf808ipTGB0y0WogzsGv5BVM4Qv85Z+JYwC9FA=
github.com/lbryio/lbcd v0.22.201-beta-rc1 h1:FmzzApVj2RBXloLM2w9tLvN2xyTZjeyh+QC7GIw/wwo=
github.com/lbryio/lbcd v0.22.201-beta-rc1/go.mod h1:kNuzGWf808ipTGB0y0WogzsGv5BVM4Qv85Z+JYwC9FA=
github.com/lbryio/lbcd v0.22.201-beta-rc4 h1:Xh751Bh/GWRcP5bI6NJ2+zueo2otTcTWapFvFbryP5c=
github.com/lbryio/lbcd v0.22.201-beta-rc4/go.mod h1:Jgo48JDINhdOgHHR83J70Q6G42x3WAo9DI//QogcL+E=
github.com/lbryio/lbcutil v1.0.201/go.mod h1:gDHc/b+Rdz3J7+VB8e5/Bl9roVf8Q5/8FQCyuK9dXD0=
@ -375,8 +373,6 @@ github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-b
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/linxGnu/grocksdb v1.6.42 h1:nJLoXFuzwBwQQQrXTUgRGRz1QRm7y8pR6CNV/gwrbqs=
github.com/linxGnu/grocksdb v1.6.42/go.mod h1:JcMMDBFaDNhRXFYcYXmgQwb/RarSld1PulTI7UzE+w0=
github.com/linxGnu/grocksdb v1.7.0 h1:UyFDykX0CUfxDN10cqlFho/rwt9K6KoDaLXL9Ej5z9g=
github.com/linxGnu/grocksdb v1.7.0/go.mod h1:JcMMDBFaDNhRXFYcYXmgQwb/RarSld1PulTI7UzE+w0=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
github.com/lyoshenka/bencode v0.0.0-20180323155644-b7abd7672df5/go.mod h1:H0aPCWffGOaDcjkw1iB7W9DVLp6GXmfcJY/7YZCWPA4=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=

12
main.go
View file

@ -10,6 +10,7 @@ import (
"github.com/lbryio/herald.go/internal"
pb "github.com/lbryio/herald.go/protobuf/go"
"github.com/lbryio/herald.go/server"
"github.com/lbryio/lbry.go/v3/extras/stop"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
@ -27,13 +28,16 @@ func main() {
if args.CmdType == server.ServeCmd {
// This will cancel goroutines with the server finishes.
ctxWCancel, cancel := context.WithCancel(ctx)
defer cancel()
// ctxWCancel, cancel := context.WithCancel(ctx)
// defer cancel()
stopGroup := stop.New()
// defer stopGroup.Stop()
initsignals()
initsignals(stopGroup.Ch())
interrupt := interruptListener()
s := server.MakeHubServer(ctxWCancel, args)
// s := server.MakeHubServer(ctxWCancel, args)
s := server.MakeHubServer(stopGroup, args)
go s.Run()
defer func() {

View file

@ -88,7 +88,7 @@ test_command_with_want
### blockchain.block.get_chunk
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.block.get_chunk", "params": [0]}'
| jq .result | sed 's/"//g' | head -c 100
EOM
@ -97,7 +97,7 @@ test_command_with_want
### blockchain.block.get_header
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.block.get_header", "params": []}'
| jq .result.timestamp
EOM
@ -106,7 +106,7 @@ test_command_with_want
### blockchain.block.headers
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.block.headers", "params": []}'
| jq .result.count
EOM
@ -116,7 +116,7 @@ test_command_with_want
## blockchain.claimtrie
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.claimtrie.resolve", "params":[{"Data": ["@Styxhexenhammer666:2"]}]}'
| jq .result.txos[0].tx_hash | sed 's/"//g'
EOM
@ -128,7 +128,7 @@ test_command_with_want
### blockchain.address.get_balance
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.address.get_balance", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
| jq .result.confirmed
EOM
@ -138,7 +138,7 @@ test_command_with_want
## blockchain.address.get_history
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.address.get_history", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
| jq '.result.confirmed | length'
EOM
@ -148,7 +148,7 @@ test_command_with_want
## blockchain.address.listunspent
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.address.listunspent", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
| jq '.result | length'
EOM
@ -160,7 +160,7 @@ test_command_with_want
## blockchain.scripthash.get_mempool
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.scripthash.get_mempool", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
| jq .error | sed 's/"//g'
EOM
@ -170,7 +170,7 @@ test_command_with_want
## blockchain.scripthash.get_history
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.scripthash.get_history", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
| jq .error | sed 's/"//g'
EOM
@ -180,13 +180,42 @@ test_command_with_want
## blockchain.scripthash.listunspent
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.scripthash.listunspent", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
| jq .error | sed 's/"//g'
EOM
WANT="encoding/hex: invalid byte: U+0047 'G'"
test_command_with_want
## server.banner
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "server.banner", "params":[]}'
| jq .result | sed 's/"//g'
EOM
WANT="You are connected to an 0.107.0 server."
test_command_with_want
## server.version
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "server.version", "params":[]}'
| jq .result | sed 's/"//g'
EOM
WANT="0.107.0"
test_command_with_want
## server.features
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "server.features", "params":[]}'
EOM
WANT='{"result":{"hosts":{},"pruning":"","server_version":"0.107.0","protocol_min":"0.54.0","protocol_max":"0.199.0","genesis_hash":"9c89283ba0f3227f6c03b70216b9f665f0118d5e0fa729cedf4fb34d6a34f463","description":"Herald","payment_address":"","donation_address":"","daily_fee":"1.0","hash_function":"sha256","trending_algorithm":"fast_ar"},"error":null,"id":1}'
test_command_with_want
# metrics endpoint testing
WANT=0

View file

@ -1,6 +1,7 @@
package server
import (
"fmt"
"log"
"os"
"strconv"
@ -36,9 +37,20 @@ type Args struct {
RefreshDelta int
CacheTTL int
PeerFile string
Banner *string
Country string
BlockingChannelIds []string
FilteringChannelIds []string
GenesisHash string
ServerVersion string
ProtocolMin string
ProtocolMax string
ServerDescription string
PaymentAddress string
DonationAddress string
DailyFee string
Debug bool
DisableEs bool
DisableLoadPeers bool
@ -63,12 +75,23 @@ const (
DefaultPrometheusPort = "2112"
DefaultNotifierPort = "18080"
DefaultJSONRPCPort = 50001
DefaultJSONRPCHTTPPort = 50002
DefaultMaxSessions = 10000
DefaultSessionTimeout = 300
DefaultRefreshDelta = 5
DefaultCacheTTL = 5
DefaultPeerFile = "peers.txt"
DefaultBannerFile = ""
DefaultCountry = "US"
HUB_PROTOCOL_VERSION = "0.107.0"
PROTOCOL_MIN = "0.54.0"
PROTOCOL_MAX = "0.199.0"
DefaultServerDescription = "Herald"
DefaultPaymentAddress = ""
DefaultDonationAddress = ""
DefaultDailyFee = "1.0"
DefaultDisableLoadPeers = false
DefaultDisableStartPrometheus = false
DefaultDisableStartUDP = false
@ -86,6 +109,73 @@ var (
DefaultFilteringChannelIds = []string{}
)
func loadBanner(bannerFile *string, serverVersion string) *string {
var banner string
data, err := os.ReadFile(*bannerFile)
if err != nil {
banner = fmt.Sprintf("You are connected to an %s server.", serverVersion)
} else {
banner = string(data)
}
/*
banner := os.Getenv("BANNER")
if banner == "" {
return nil
}
*/
return &banner
}
// MakeDefaultArgs creates a default set of arguments for testing the server.
func MakeDefaultTestArgs() *Args {
args := &Args{
CmdType: ServeCmd,
Host: DefaultHost,
Port: DefaultPort,
DBPath: DefaultDBPath,
EsHost: DefaultEsHost,
EsPort: DefaultEsPort,
PrometheusPort: DefaultPrometheusPort,
NotifierPort: DefaultNotifierPort,
JSONRPCPort: DefaultJSONRPCPort,
JSONRPCHTTPPort: DefaultJSONRPCHTTPPort,
MaxSessions: DefaultMaxSessions,
SessionTimeout: DefaultSessionTimeout,
EsIndex: DefaultEsIndex,
RefreshDelta: DefaultRefreshDelta,
CacheTTL: DefaultCacheTTL,
PeerFile: DefaultPeerFile,
Banner: nil,
Country: DefaultCountry,
GenesisHash: chaincfg.TestNet3Params.GenesisHash.String(),
ServerVersion: HUB_PROTOCOL_VERSION,
ProtocolMin: PROTOCOL_MIN,
ProtocolMax: PROTOCOL_MAX,
ServerDescription: DefaultServerDescription,
PaymentAddress: DefaultPaymentAddress,
DonationAddress: DefaultDonationAddress,
DailyFee: DefaultDailyFee,
DisableEs: true,
Debug: true,
DisableLoadPeers: true,
DisableStartPrometheus: true,
DisableStartUDP: true,
DisableWritePeers: true,
DisableRocksDBRefresh: true,
DisableResolve: true,
DisableBlockingAndFiltering: true,
DisableStartNotifier: true,
DisableStartJSONRPC: true,
}
return args
}
// GetEnvironment takes the environment variables as an array of strings
// and a getkeyval function to turn it into a map.
func GetEnvironment(data []string, getkeyval func(item string) (key, val string)) map[string]string {
@ -111,7 +201,7 @@ func GetEnvironmentStandard() map[string]string {
func ParseArgs(searchRequest *pb.SearchRequest) *Args {
environment := GetEnvironmentStandard()
parser := argparse.NewParser("hub", "hub server and client")
parser := argparse.NewParser("herald", "herald server and client")
serveCmd := parser.NewCommand("serve", "start the hub server")
searchCmd := parser.NewCommand("search", "claim search")
@ -122,6 +212,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
return err
}
// main server config arguments
host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost})
port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort})
dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath})
@ -131,18 +222,26 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort})
prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort})
notifierPort := parser.String("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort})
jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort})
jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort})
jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort, Default: DefaultJSONRPCPort})
jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort, Default: DefaultJSONRPCHTTPPort})
maxSessions := parser.Int("", "max-sessions", &argparse.Options{Required: false, Help: "Maximum number of electrum clients that can be connected", Default: DefaultMaxSessions})
sessionTimeout := parser.Int("", "session-timeout", &argparse.Options{Required: false, Help: "Session inactivity timeout (seconds)", Default: DefaultSessionTimeout})
esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: DefaultEsIndex})
refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: DefaultRefreshDelta})
cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: DefaultCacheTTL})
peerFile := parser.String("", "peerfile", &argparse.Options{Required: false, Help: "Initial peer file for federation", Default: DefaultPeerFile})
bannerFile := parser.String("", "bannerfile", &argparse.Options{Required: false, Help: "Banner file server.banner", Default: DefaultBannerFile})
country := parser.String("", "country", &argparse.Options{Required: false, Help: "Country this node is running in. Default US.", Default: DefaultCountry})
blockingChannelIds := parser.StringList("", "blocking-channel-ids", &argparse.Options{Required: false, Help: "Blocking channel ids", Default: DefaultBlockingChannelIds})
filteringChannelIds := parser.StringList("", "filtering-channel-ids", &argparse.Options{Required: false, Help: "Filtering channel ids", Default: DefaultFilteringChannelIds})
// arguments for server features
serverDescription := parser.String("", "server-description", &argparse.Options{Required: false, Help: "Server description", Default: DefaultServerDescription})
paymentAddress := parser.String("", "payment-address", &argparse.Options{Required: false, Help: "Payment address", Default: DefaultPaymentAddress})
donationAddress := parser.String("", "donation-address", &argparse.Options{Required: false, Help: "Donation address", Default: DefaultDonationAddress})
dailyFee := parser.String("", "daily-fee", &argparse.Options{Required: false, Help: "Daily fee", Default: DefaultDailyFee})
// flags for disabling features
debug := parser.Flag("", "debug", &argparse.Options{Required: false, Help: "enable debug logging", Default: false})
disableEs := parser.Flag("", "disable-es", &argparse.Options{Required: false, Help: "Disable elastic search, for running/testing independently", Default: false})
disableLoadPeers := parser.Flag("", "disable-load-peers", &argparse.Options{Required: false, Help: "Disable load peers from disk at startup", Default: DefaultDisableLoadPeers})
@ -156,6 +255,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
disableStartNotifier := parser.Flag("", "disable-start-notifier", &argparse.Options{Required: false, Help: "Disable start notifier", Default: DisableStartNotifier})
disableStartJSONRPC := parser.Flag("", "disable-start-jsonrpc", &argparse.Options{Required: false, Help: "Disable start jsonrpc endpoint", Default: DisableStartJSONRPC})
// search command arguments
text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"})
name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"})
claimType := parser.String("", "claim_type", &argparse.Options{Required: false, Help: "claim_type"})
@ -177,6 +277,8 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
*jsonRPCPort = DefaultJSONRPCPort
}
banner := loadBanner(bannerFile, HUB_PROTOCOL_VERSION)
args := &Args{
CmdType: SearchCmd,
Host: *host,
@ -195,9 +297,20 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
RefreshDelta: *refreshDelta,
CacheTTL: *cacheTTL,
PeerFile: *peerFile,
Banner: banner,
Country: *country,
BlockingChannelIds: *blockingChannelIds,
FilteringChannelIds: *filteringChannelIds,
GenesisHash: "",
ServerVersion: HUB_PROTOCOL_VERSION,
ProtocolMin: PROTOCOL_MIN,
ProtocolMax: PROTOCOL_MAX,
ServerDescription: *serverDescription,
PaymentAddress: *paymentAddress,
DonationAddress: *donationAddress,
DailyFee: *dailyFee,
Debug: *debug,
DisableEs: *disableEs,
DisableLoadPeers: *disableLoadPeers,

View file

@ -12,7 +12,8 @@ import (
"github.com/lbryio/herald.go/internal/metrics"
pb "github.com/lbryio/herald.go/protobuf/go"
server "github.com/lbryio/herald.go/server"
"github.com/lbryio/herald.go/server"
"github.com/lbryio/lbry.go/v3/extras/stop"
dto "github.com/prometheus/client_model/go"
"google.golang.org/grpc"
)
@ -44,43 +45,11 @@ func removeFile(fileName string) {
}
}
// makeDefaultArgs creates a default set of arguments for testing the server.
func makeDefaultArgs() *server.Args {
args := &server.Args{
CmdType: server.ServeCmd,
Host: server.DefaultHost,
Port: server.DefaultPort,
DBPath: server.DefaultDBPath,
EsHost: server.DefaultEsHost,
EsPort: server.DefaultEsPort,
PrometheusPort: server.DefaultPrometheusPort,
NotifierPort: server.DefaultNotifierPort,
JSONRPCPort: server.DefaultJSONRPCPort,
EsIndex: server.DefaultEsIndex,
RefreshDelta: server.DefaultRefreshDelta,
CacheTTL: server.DefaultCacheTTL,
PeerFile: server.DefaultPeerFile,
Country: server.DefaultCountry,
DisableEs: true,
Debug: true,
DisableLoadPeers: true,
DisableStartPrometheus: true,
DisableStartUDP: true,
DisableWritePeers: true,
DisableRocksDBRefresh: true,
DisableResolve: true,
DisableBlockingAndFiltering: true,
DisableStartNotifier: true,
DisableStartJSONRPC: true,
}
return args
}
// TestAddPeer tests the ability to add peers
func TestAddPeer(t *testing.T) {
ctx := context.Background()
args := makeDefaultArgs()
// ctx := context.Background()
ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs()
tests := []struct {
name string
@ -137,8 +106,9 @@ func TestAddPeer(t *testing.T) {
// TestPeerWriter tests that peers get written properly
func TestPeerWriter(t *testing.T) {
ctx := context.Background()
args := makeDefaultArgs()
// ctx := context.Background()
ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs()
args.DisableWritePeers = false
tests := []struct {
@ -193,9 +163,10 @@ func TestPeerWriter(t *testing.T) {
// TestAddPeerEndpoint tests the ability to add peers
func TestAddPeerEndpoint(t *testing.T) {
ctx := context.Background()
args := makeDefaultArgs()
args2 := makeDefaultArgs()
// ctx := context.Background()
ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs()
args2 := server.MakeDefaultTestArgs()
args2.Port = "50052"
tests := []struct {
@ -264,10 +235,11 @@ func TestAddPeerEndpoint(t *testing.T) {
// TestAddPeerEndpoint2 tests the ability to add peers
func TestAddPeerEndpoint2(t *testing.T) {
ctx := context.Background()
args := makeDefaultArgs()
args2 := makeDefaultArgs()
args3 := makeDefaultArgs()
// ctx := context.Background()
ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs()
args2 := server.MakeDefaultTestArgs()
args3 := server.MakeDefaultTestArgs()
args2.Port = "50052"
args3.Port = "50053"
@ -345,10 +317,11 @@ func TestAddPeerEndpoint2(t *testing.T) {
// TestAddPeerEndpoint3 tests the ability to add peers
func TestAddPeerEndpoint3(t *testing.T) {
ctx := context.Background()
args := makeDefaultArgs()
args2 := makeDefaultArgs()
args3 := makeDefaultArgs()
// ctx := context.Background()
ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs()
args2 := server.MakeDefaultTestArgs()
args3 := server.MakeDefaultTestArgs()
args2.Port = "50052"
args3.Port = "50053"
@ -434,10 +407,11 @@ func TestAddPeerEndpoint3(t *testing.T) {
// TestAddPeer tests the ability to add peers
func TestUDPServer(t *testing.T) {
ctx := context.Background()
args := makeDefaultArgs()
// ctx := context.Background()
ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs()
args.DisableStartUDP = false
args2 := makeDefaultArgs()
args2 := server.MakeDefaultTestArgs()
args2.Port = "50052"
args2.DisableStartUDP = false

View file

@ -105,6 +105,7 @@ func newBlockHeaderElectrum(header *[HEADER_SIZE]byte, height uint32) *BlockHead
type BlockGetServerHeightReq struct{}
type BlockGetServerHeightResp uint32
// blockchain.block.get_server_height
func (s *BlockchainBlockService) Get_server_height(req *BlockGetServerHeightReq, resp **BlockGetServerHeightResp) error {
if s.DB == nil || s.DB.LastState == nil {
return fmt.Errorf("unknown height")

View file

@ -13,6 +13,7 @@ import (
"github.com/lbryio/lbcd/chaincfg"
"github.com/lbryio/lbcd/txscript"
"github.com/lbryio/lbcutil"
"github.com/lbryio/lbry.go/v3/extras/stop"
)
// Source: test_variety_of_transactions_and_longish_history (lbry-sdk/tests/integration/transactions)
@ -57,8 +58,9 @@ var regTestAddrs = [30]string{
func TestServerGetHeight(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
grp := stop.NewDebug()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -87,8 +89,9 @@ func TestServerGetHeight(t *testing.T) {
func TestGetChunk(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
grp := stop.NewDebug()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -130,8 +133,9 @@ func TestGetChunk(t *testing.T) {
func TestGetHeader(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
grp := stop.NewDebug()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -159,8 +163,9 @@ func TestGetHeader(t *testing.T) {
func TestHeaders(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
grp := stop.NewDebug()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -189,15 +194,17 @@ func TestHeaders(t *testing.T) {
}
func TestHeadersSubscribe(t *testing.T) {
args := MakeDefaultTestArgs()
grp := stop.NewDebug()
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
}
sm := newSessionManager(db, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout)
sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams)
sm.start()
defer sm.stop()
@ -209,13 +216,13 @@ func TestHeadersSubscribe(t *testing.T) {
// Set up logic to read a notification.
var received sync.WaitGroup
recv := func(client net.Conn) {
defer received.Done()
buf := make([]byte, 1024)
len, err := client.Read(buf)
if err != nil {
t.Errorf("read err: %v", err)
}
t.Logf("len: %v notification: %v", len, string(buf))
received.Done()
}
received.Add(2)
go recv(client1)
@ -281,8 +288,9 @@ func TestHeadersSubscribe(t *testing.T) {
func TestGetBalance(t *testing.T) {
secondaryPath := "asdf"
moodyjon commented 2022-10-19 16:25:36 +02:00 (Migrated from github.com)
Review

defer db.Shutdown() to match style elsewhere.

`defer db.Shutdown()` to match style elsewhere.
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
grp := stop.NewDebug()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -310,8 +318,9 @@ func TestGetBalance(t *testing.T) {
func TestGetHistory(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
grp := stop.NewDebug()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -339,8 +348,9 @@ func TestGetHistory(t *testing.T) {
func TestListUnspent(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
grp := stop.NewDebug()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -367,15 +377,17 @@ func TestListUnspent(t *testing.T) {
}
func TestAddressSubscribe(t *testing.T) {
args := MakeDefaultTestArgs()
grp := stop.NewDebug()
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
}
sm := newSessionManager(db, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout)
sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams)
sm.start()
defer sm.stop()

89
server/jsonrpc_server.go Normal file
View file

@ -0,0 +1,89 @@
package server
import (
log "github.com/sirupsen/logrus"
)
type ServerService struct {
Args *Args
}
type ServerFeatureService struct {
Args *Args
}
type ServerFeaturesReq struct{}
type ServerFeaturesRes struct {
Hosts map[string]string `json:"hosts"`
Pruning string `json:"pruning"`
ServerVersion string `json:"server_version"`
ProtocolMin string `json:"protocol_min"`
ProtocolMax string `json:"protocol_max"`
GenesisHash string `json:"genesis_hash"`
Description string `json:"description"`
PaymentAddress string `json:"payment_address"`
DonationAddress string `json:"donation_address"`
DailyFee string `json:"daily_fee"`
HashFunction string `json:"hash_function"`
TrendingAlgorithm string `json:"trending_algorithm"`
}
// Features is the json rpc endpoint for 'server.features'.
func (t *ServerService) Features(req *ServerFeaturesReq, res **ServerFeaturesRes) error {
log.Println("Features")
features := &ServerFeaturesRes{
Hosts: map[string]string{},
Pruning: "",
ServerVersion: HUB_PROTOCOL_VERSION,
ProtocolMin: PROTOCOL_MIN,
ProtocolMax: PROTOCOL_MAX,
GenesisHash: t.Args.GenesisHash,
Description: t.Args.ServerDescription,
PaymentAddress: t.Args.PaymentAddress,
DonationAddress: t.Args.DonationAddress,
DailyFee: t.Args.DailyFee,
HashFunction: "sha256",
TrendingAlgorithm: "fast_ar",
}
*res = features
return nil
}
type ServerBannerService struct {
Args *Args
}
type ServerBannerReq struct{}
type ServerBannerRes string
// Banner is the json rpc endpoint for 'server.banner'.
func (t *ServerService) Banner(req *ServerBannerReq, res **ServerBannerRes) error {
log.Println("Banner")
*res = (*ServerBannerRes)(t.Args.Banner)
return nil
}
type ServerVersionService struct {
Args *Args
}
type ServerVersionReq struct{}
type ServerVersionRes string
// Banner is the json rpc endpoint for 'server.version'.
// FIXME: This should return a struct with the version and the protocol version.
// <<-- that comment was written by github, scary shit because it's true
func (t *ServerService) Version(req *ServerVersionReq, res **ServerVersionRes) error {
log.Println("Version")
*res = (*ServerVersionRes)(&t.Args.ServerVersion)
return nil
}

View file

@ -121,6 +121,14 @@ fail1:
goto fail2
}
// Register "server.{features,banner,version}" handlers.
serverSvc := &ServerService{s.Args}
err = s1.RegisterTCPService(serverSvc, "server")
if err != nil {
log.Errorf("RegisterTCPService: %v\n", err)
goto fail2
}
r := gorilla_mux.NewRouter()
r.Handle("/rpc", s1)
port := ":" + strconv.FormatUint(uint64(s.Args.JSONRPCHTTPPort), 10)

View file

@ -1,7 +1,6 @@
package server_test
import (
"context"
"encoding/hex"
"fmt"
"net"
@ -10,6 +9,7 @@ import (
"github.com/lbryio/herald.go/internal"
"github.com/lbryio/herald.go/server"
"github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/sirupsen/logrus"
)
@ -47,8 +47,9 @@ func tcpRead(conn net.Conn) ([]byte, error) {
}
func TestNotifierServer(t *testing.T) {
args := makeDefaultArgs()
ctx := context.Background()
args := server.MakeDefaultTestArgs()
// ctx := context.Background()
ctx := stop.NewDebug()
hub := server.MakeHubServer(ctx, args)
go hub.NotifierServer()

View file

@ -11,6 +11,7 @@ import (
pb "github.com/lbryio/herald.go/protobuf/go"
server "github.com/lbryio/herald.go/server"
"github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/olivere/elastic/v7"
)
@ -55,13 +56,14 @@ func TestSearch(t *testing.T) {
w.Write([]byte(resp))
}
context := context.Background()
args := makeDefaultArgs()
hubServer := server.MakeHubServer(context, args)
ctx := context.Background()
stopGroup := stop.NewDebug()
args := server.MakeDefaultTestArgs()
hubServer := server.MakeHubServer(stopGroup, args)
req := &pb.SearchRequest{
Text: "asdf",
}
out, err := hubServer.Search(context, req)
out, err := hubServer.Search(ctx, req)
if err != nil {
log.Println(err)
}

View file

@ -22,6 +22,7 @@ import (
"github.com/lbryio/herald.go/meta"
pb "github.com/lbryio/herald.go/protobuf/go"
"github.com/lbryio/lbcd/chaincfg"
"github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/olivere/elastic/v7"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
@ -53,6 +54,7 @@ type Server struct {
HeightSubs map[net.Addr]net.Conn
HeightSubsMut sync.RWMutex
NotifierChan chan interface{}
Grp *stop.Group
sessionManager *sessionManager
pb.UnimplementedHubServer
}
@ -149,7 +151,7 @@ func (s *Server) Run() {
}
}
func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) {
func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, error) {
tmpName, err := ioutil.TempDir("", "go-lbry-hub")
if err != nil {
logrus.Info(err)
@ -159,10 +161,7 @@ func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) {
if err != nil {
logrus.Info(err)
}
myDB, _, err := db.GetProdDB(args.DBPath, tmpName)
// dbShutdown = func() {
// db.Shutdown(myDB)
// }
myDB, err := db.GetProdDB(args.DBPath, tmpName, grp)
if err != nil {
// Can't load the db, fail loudly
logrus.Info(err)
@ -217,7 +216,7 @@ func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) {
// MakeHubServer takes the arguments given to a hub when it's started and
// initializes everything. It loads information about previously known peers,
// creates needed internal data structures, and initializes goroutines.
func MakeHubServer(ctx context.Context, args *Args) *Server {
func MakeHubServer(grp *stop.Group, args *Args) *Server {
grpcServer := grpc.NewServer(grpc.NumStreamWorkers(0))
multiSpaceRe, err := regexp.Compile(`\s{2,}`)
@ -266,14 +265,14 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
//TODO: is this the right place to load the db?
var myDB *db.ReadOnlyDBColumnFamily
// var dbShutdown = func() {}
if !args.DisableResolve {
myDB, err = LoadDatabase(args)
myDB, err = LoadDatabase(args, grp)
moodyjon commented 2022-10-19 15:57:30 +02:00 (Migrated from github.com)
Review

Not needed.

Not needed.
if err != nil {
logrus.Warning(err)
}
}
moodyjon commented 2022-10-14 16:20:05 +02:00 (Migrated from github.com)
Review

Ahaa. So this is how it gets set. You might consider passing grp down into LoadDatabase(). So it gets initialized ASAP. Also, don't forget the ReadonlyDBColumnFamilies instance constructed in db_test.go.

Ahaa. So this is how it gets set. You might consider passing `grp` down into `LoadDatabase()`. So it gets initialized ASAP. Also, don't forget the `ReadonlyDBColumnFamilies` instance constructed in db_test.go.
// Determine which chain to use based on db and cli values
dbChain := (*chaincfg.Params)(nil)
if myDB != nil && myDB.LastState != nil && myDB.LastState.Genesis != nil {
// The chain params can be inferred from DBStateValue.
@ -310,6 +309,10 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
}
logrus.Infof("network: %v", chain.Name)
args.GenesisHash = chain.GenesisHash.String()
sessionGrp := stop.New(grp)
s := &Server{
GrpcServer: grpcServer,
Args: args,
@ -333,7 +336,8 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
HeightSubs: make(map[net.Addr]net.Conn),
HeightSubsMut: sync.RWMutex{},
NotifierChan: make(chan interface{}),
sessionManager: newSessionManager(myDB, &chain, args.MaxSessions, args.SessionTimeout),
Grp: grp,
sessionManager: newSessionManager(myDB, args, sessionGrp, &chain),
}
// Start up our background services

View file

@ -1,5 +1,11 @@
package server
import (
"github.com/lbryio/herald.go/db"
"github.com/lbryio/lbcd/chaincfg"
"github.com/lbryio/lbry.go/v3/extras/stop"
)
func (s *Server) AddPeerExported() func(*Peer, bool, bool) error {
return s.addPeer
}
@ -7,3 +13,7 @@ func (s *Server) AddPeerExported() func(*Peer, bool, bool) error {
func (s *Server) GetNumPeersExported() func() int64 {
return s.getNumPeers
}
func NewSessionManagerExported(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
return newSessionManager(db, args, grp, chain)
}

View file

@ -14,6 +14,7 @@ import (
"github.com/lbryio/herald.go/db"
"github.com/lbryio/herald.go/internal"
"github.com/lbryio/lbcd/chaincfg"
"github.com/lbryio/lbry.go/v3/extras/stop"
log "github.com/sirupsen/logrus"
)
@ -116,11 +117,13 @@ type sessionManager struct {
// sessionsMut protects sessions, headerSubs, hashXSubs state
sessionsMut sync.RWMutex
sessions sessionMap
sessionsWait sync.WaitGroup
// sessionsWait sync.WaitGroup
grp *stop.Group
sessionsMax int
sessionTimeout time.Duration
manageTicker *time.Ticker
db *db.ReadOnlyDBColumnFamily
args *Args
chain *chaincfg.Params
// headerSubs are sessions subscribed via 'blockchain.headers.subscribe'
headerSubs sessionMap
@ -128,13 +131,15 @@ type sessionManager struct {
hashXSubs map[[HASHX_LEN]byte]sessionMap
}
func newSessionManager(db *db.ReadOnlyDBColumnFamily, chain *chaincfg.Params, sessionsMax, sessionTimeout int) *sessionManager {
func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
return &sessionManager{
sessions: make(sessionMap),
sessionsMax: sessionsMax,
sessionTimeout: time.Duration(sessionTimeout) * time.Second,
manageTicker: time.NewTicker(time.Duration(max(5, sessionTimeout/20)) * time.Second),
grp: grp,
sessionsMax: args.MaxSessions,
sessionTimeout: time.Duration(args.SessionTimeout) * time.Second,
manageTicker: time.NewTicker(time.Duration(max(5, args.SessionTimeout/20)) * time.Second),
db: db,
args: args,
chain: chain,
headerSubs: make(sessionMap),
hashXSubs: make(map[[HASHX_LEN]byte]sessionMap),
@ -142,6 +147,7 @@ func newSessionManager(db *db.ReadOnlyDBColumnFamily, chain *chaincfg.Params, se
}
func (sm *sessionManager) start() {
sm.grp.Add(1)
go sm.manage()
}
@ -168,7 +174,13 @@ func (sm *sessionManager) manage() {
}
sm.sessionsMut.Unlock()
// Wait for next management clock tick.
<-sm.manageTicker.C
select {
case <-sm.grp.Ch():
sm.grp.Done()
return
case <-sm.manageTicker.C:
continue
}
}
}
@ -190,11 +202,18 @@ func (sm *sessionManager) addSession(conn net.Conn) *session {
// each request and update subscriptions.
s1 := rpc.NewServer()
// Register "server.{features,banner,version}" handlers.
serverSvc := &ServerService{sm.args}
err := s1.RegisterName("server", serverSvc)
if err != nil {
log.Errorf("RegisterName: %v\n", err)
}
// Register "blockchain.claimtrie.*"" handlers.
claimtrieSvc := &ClaimtrieService{sm.db}
err := s1.RegisterName("blockchain.claimtrie", claimtrieSvc)
err = s1.RegisterName("blockchain.claimtrie", claimtrieSvc)
if err != nil {
log.Errorf("RegisterService: %v\n", err)
log.Errorf("RegisterName: %v\n", err)
}
// Register other "blockchain.{block,address,scripthash}.*" handlers.
@ -220,11 +239,11 @@ func (sm *sessionManager) addSession(conn net.Conn) *session {
goto fail
}
sm.sessionsWait.Add(1)
sm.grp.Add(1)
go func() {
s1.ServeCodec(&SessionServerCodec{jsonrpc.NewServerCodec(conn), sess})
log.Infof("session %v goroutine exit", sess.addr.String())
sm.sessionsWait.Done()
sm.grp.Done()
}()
return sess

View file

@ -11,7 +11,7 @@ import (
// TestUDPPing tests UDPPing correctness against prod server.
func TestUDPPing(t *testing.T) {
args := makeDefaultArgs()
args := server.MakeDefaultTestArgs()
args.DisableStartUDP = true
tests := []struct {

View file

@ -8,12 +8,13 @@ import (
"os"
"os/signal"
"github.com/lbryio/lbry.go/v3/extras/stop"
log "github.com/sirupsen/logrus"
)
// shutdownRequestChannel is used to initiate shutdown from one of the
// subsystems using the same code paths as when an interrupt signal is received.
var shutdownRequestChannel = make(chan struct{})
var shutdownRequestChannel = make(stop.Chan)
// interruptSignals defines the default signals to catch in order to do a proper
// shutdown. This may be modified during init depending on the platform.

View file

@ -10,9 +10,12 @@ package main
import (
"os"
"syscall"
"github.com/lbryio/lbry.go/v3/extras/stop"
)
// initsignals sets the signals to be caught by the signal handler
func initsignals() {
func initsignals(stopCh stop.Chan) {
shutdownRequestChannel = stopCh
interruptSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
}