From cd7f20a4618e8c22511b17c137b5faa1cbf10143 Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Tue, 25 Oct 2022 08:48:13 +0300 Subject: [PATCH] Server endpoints goroutine refactor (#69) * server.xxx endpoints Additional server endpoints in jsonrpc and also some refactoring * server.banner * more endpoints * use lbry.go stop pattern * set genesis hash properly * updates and refactors * remove shutdowncalled and itmut * remove OpenIterators * remove shutdown and done channels from db and use stop group * currently broken, incorporated stop groups into the session manager * set the rest of the default args for tests * add default json rpc http port and cleanup * tests for new jsonrpc endpoints * cleanup and add manage goroutine to stopper pattern * cleanup * NewDebug * asdf * fix --- db/db.go | 76 +++------- db/db_test.go | 119 +++++++++------- db/iteroptions.go | 27 ++-- go.mod | 5 +- go.sum | 4 - main.go | 12 +- scripts/integration_tests.sh | 49 +++++-- server/args.go | 229 ++++++++++++++++++++++-------- server/federation_test.go | 78 ++++------ server/jsonrpc_blockchain.go | 1 + server/jsonrpc_blockchain_test.go | 54 ++++--- server/jsonrpc_server.go | 89 ++++++++++++ server/jsonrpc_service.go | 8 ++ server/notifier_test.go | 7 +- server/search_test.go | 10 +- server/server.go | 22 +-- server/server_test_pkg.go | 10 ++ server/session.go | 43 ++++-- server/udp_test.go | 2 +- signal.go | 3 +- signalsigterm.go | 5 +- 21 files changed, 546 insertions(+), 307 deletions(-) create mode 100644 server/jsonrpc_server.go diff --git a/db/db.go b/db/db.go index b659ff4..4051c50 100644 --- a/db/db.go +++ b/db/db.go @@ -8,7 +8,6 @@ import ( "encoding/hex" "fmt" "os" - "sync" "time" "github.com/lbryio/herald.go/db/prefixes" @@ -16,6 +15,7 @@ import ( "github.com/lbryio/herald.go/internal" "github.com/lbryio/herald.go/internal/metrics" pb "github.com/lbryio/herald.go/protobuf/go" + "github.com/lbryio/lbry.go/v3/extras/stop" "github.com/linxGnu/grocksdb" log "github.com/sirupsen/logrus" @@ -59,11 +59,7 @@ type ReadOnlyDBColumnFamily struct { BlockedChannels map[string][]byte FilteredStreams map[string][]byte FilteredChannels map[string][]byte - OpenIterators map[string][]chan struct{} - ItMut sync.RWMutex - ShutdownChan chan struct{} - DoneChan chan struct{} - ShutdownCalled bool + Grp *stop.Group Cleanup func() } @@ -339,19 +335,10 @@ func interruptRequested(interrupted <-chan struct{}) bool { func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { ch := make(chan *prefixes.PrefixRowKV) - iterKey := fmt.Sprintf("%p", opts) - if opts.DB != nil { - opts.DB.ItMut.Lock() - // There is a tiny chance that we were wating on the above lock while shutdown was - // being called and by the time we get it the db has already notified all active - // iterators to shutdown. In this case we go to the else branch. - if !opts.DB.ShutdownCalled { - opts.DB.OpenIterators[iterKey] = []chan struct{}{opts.DoneChan, opts.ShutdownChan} - opts.DB.ItMut.Unlock() - } else { - opts.DB.ItMut.Unlock() - return ch - } + // Check if we've been told to shutdown in between getting created and getting here + if opts.Grp != nil && interruptRequested(opts.Grp.Ch()) { + opts.Grp.Done() + return ch } ro := grocksdb.NewDefaultReadOptions() @@ -369,11 +356,9 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { it.Close() close(ch) ro.Destroy() - if opts.DB != nil { - opts.DoneChan <- struct{}{} - opts.DB.ItMut.Lock() - delete(opts.DB.OpenIterators, iterKey) - opts.DB.ItMut.Unlock() + if opts.Grp != nil { + // opts.Grp.DoneNamed(iterKey) + opts.Grp.Done() } }() @@ -394,7 +379,7 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { if kv = opts.ReadRow(&prevKey); kv != nil { ch <- kv } - if interruptRequested(opts.ShutdownChan) { + if opts.Grp != nil && interruptRequested(opts.Grp.Ch()) { return } } @@ -546,7 +531,7 @@ func GetWriteDBCF(name string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHandle, er } // GetProdDB returns a db that is used for production. -func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func(), error) { +func GetProdDB(name string, secondaryPath string, grp *stop.Group) (*ReadOnlyDBColumnFamily, error) { prefixNames := prefixes.GetPrefixes() // additional prefixes that aren't in the code explicitly cfNames := []string{"default", "e", "d", "c"} @@ -555,7 +540,7 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func cfNames = append(cfNames, cfName) } - db, err := GetDBColumnFamilies(name, secondaryPath, cfNames) + db, err := GetDBColumnFamilies(name, secondaryPath, cfNames, grp) cleanupFiles := func() { err = os.RemoveAll(secondaryPath) @@ -565,7 +550,8 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func } if err != nil { - return nil, cleanupFiles, err + cleanupFiles() + return nil, err } cleanupDB := func() { @@ -574,11 +560,11 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func } db.Cleanup = cleanupDB - return db, cleanupDB, nil + return db, nil } // GetDBColumnFamilies gets a db with the specified column families and secondary path. -func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*ReadOnlyDBColumnFamily, error) { +func GetDBColumnFamilies(name string, secondayPath string, cfNames []string, grp *stop.Group) (*ReadOnlyDBColumnFamily, error) { opts := grocksdb.NewDefaultOptions() roOpts := grocksdb.NewDefaultReadOptions() cfOpt := grocksdb.NewDefaultOptions() @@ -614,11 +600,7 @@ func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*R LastState: nil, Height: 0, Headers: nil, - OpenIterators: make(map[string][]chan struct{}), - ItMut: sync.RWMutex{}, - ShutdownChan: make(chan struct{}, 1), - ShutdownCalled: false, - DoneChan: make(chan struct{}, 1), + Grp: grp, } err = myDB.ReadDBState() //TODO: Figure out right place for this @@ -687,24 +669,7 @@ func (db *ReadOnlyDBColumnFamily) Unwind() { // Shutdown shuts down the db. func (db *ReadOnlyDBColumnFamily) Shutdown() { - log.Println("Sending message to ShutdownChan...") - db.ShutdownChan <- struct{}{} - log.Println("Locking iterator mutex...") - db.ItMut.Lock() - log.Println("Setting ShutdownCalled to true...") - db.ShutdownCalled = true - log.Println("Notifying iterators to shutdown...") - for _, it := range db.OpenIterators { - it[1] <- struct{}{} - } - log.Println("Waiting for iterators to shutdown...") - for _, it := range db.OpenIterators { - <-it[0] - } - log.Println("Unlocking iterator mutex...") - db.ItMut.Unlock() - log.Println("Sending message to DoneChan...") - <-db.DoneChan + db.Grp.StopAndWait() log.Println("Calling cleanup...") db.Cleanup() log.Println("Leaving Shutdown...") @@ -714,6 +679,7 @@ func (db *ReadOnlyDBColumnFamily) Shutdown() { // to keep the db readonly view up to date and handle reorgs on the // blockchain. func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) { + db.Grp.Add(1) go func() { lastPrint := time.Now() for { @@ -727,8 +693,8 @@ func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) { log.Infof("Error detecting changes: %#v", err) } select { - case <-db.ShutdownChan: - db.DoneChan <- struct{}{} + case <-db.Grp.Ch(): + db.Grp.Done() return case <-time.After(time.Millisecond * 10): } diff --git a/db/db_test.go b/db/db_test.go index 2855753..e7114eb 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -7,12 +7,12 @@ import ( "log" "os" "strings" - "sync" "testing" dbpkg "github.com/lbryio/herald.go/db" "github.com/lbryio/herald.go/db/prefixes" "github.com/lbryio/herald.go/internal" + "github.com/lbryio/lbry.go/v3/extras/stop" "github.com/linxGnu/grocksdb" ) @@ -21,7 +21,7 @@ import ( //////////////////////////////////////////////////////////////////////////////// // OpenAndFillTmpDBColumnFamlies opens a db and fills it with data from a csv file using the given column family names -func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFamily, [][]string, func(), error) { +func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFamily, [][]string, error) { log.Println(filePath) file, err := os.Open(filePath) @@ -31,7 +31,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami reader := csv.NewReader(file) records, err := reader.ReadAll() if err != nil { - return nil, nil, nil, err + return nil, nil, err } wOpts := grocksdb.NewDefaultWriteOptions() @@ -39,7 +39,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami opts.SetCreateIfMissing(true) db, err := grocksdb.OpenDb(opts, "tmp") if err != nil { - return nil, nil, nil, err + return nil, nil, err } var handleMap map[string]*grocksdb.ColumnFamilyHandle = make(map[string]*grocksdb.ColumnFamilyHandle) @@ -54,7 +54,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami log.Println(cfName) handle, err := db.CreateColumnFamily(opts, cfName) if err != nil { - return nil, nil, nil, err + return nil, nil, err } handleMap[cfName] = handle } @@ -68,16 +68,16 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami for _, record := range records[1:] { cf := record[0] if err != nil { - return nil, nil, nil, err + return nil, nil, err } handle := handleMap[string(cf)] key, err := hex.DecodeString(record[1]) if err != nil { - return nil, nil, nil, err + return nil, nil, err } val, err := hex.DecodeString(record[2]) if err != nil { - return nil, nil, nil, err + return nil, nil, err } db.PutCF(wOpts, handle, key, val) } @@ -94,11 +94,8 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami LastState: nil, Height: 0, Headers: nil, - OpenIterators: make(map[string][]chan struct{}), - ItMut: sync.RWMutex{}, - ShutdownChan: make(chan struct{}, 1), - DoneChan: make(chan struct{}, 1), - ShutdownCalled: false, + Grp: stop.New(), + Cleanup: toDefer, } // err = dbpkg.ReadDBState(myDB) //TODO: Figure out right place for this @@ -108,7 +105,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami err = myDB.InitTxCounts() if err != nil { - return nil, nil, nil, err + return nil, nil, err } // err = dbpkg.InitHeaders(myDB) @@ -116,7 +113,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami // return nil, nil, nil, err // } - return myDB, records, toDefer, nil + return myDB, records, nil } // OpenAndFillTmpDBCF opens a db and fills it with data from a csv file @@ -247,17 +244,18 @@ func CatCSV(filePath string) { func TestCatFullDB(t *testing.T) { t.Skip("Skipping full db test") + grp := stop.New() // url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c" // "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1" // url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9" // url := "lbry://@lbry" // url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a" - dbPath := "/mnt/sda/wallet_server/_data/lbry-rocksdb/" + dbPath := "/mnt/sda1/wallet_server/_data/lbry-rocksdb/" // dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" secondaryPath := "asdf" - db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath) + db, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp) + defer db.Shutdown() - defer toDefer() if err != nil { t.Error(err) return @@ -277,6 +275,7 @@ func TestCatFullDB(t *testing.T) { // TestOpenFullDB Tests running a resolve on a full db. func TestOpenFullDB(t *testing.T) { t.Skip("Skipping full db test") + grp := stop.New() // url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c" // "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1" // url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9" @@ -284,11 +283,11 @@ func TestOpenFullDB(t *testing.T) { // url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a" // url := "lbry://@lbry$1" url := "https://lbry.tv/@lothrop:2/lothrop-livestream-games-and-code:c" - dbPath := "/mnt/sda/wallet_server/_data/lbry-rocksdb/" + dbPath := "/mnt/sda1/wallet_server/_data/lbry-rocksdb/" // dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" secondaryPath := "asdf" - db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath) - defer toDefer() + db, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp) + defer db.Shutdown() if err != nil { t.Error(err) return @@ -302,12 +301,13 @@ func TestOpenFullDB(t *testing.T) { func TestResolve(t *testing.T) { url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9" filePath := "../testdata/FULL_resolve.csv" - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) return } - defer toDefer() + expandedResolveResult := db.Resolve(url) log.Printf("%#v\n", expandedResolveResult) if expandedResolveResult != nil && expandedResolveResult.Channel != nil { @@ -321,11 +321,11 @@ func TestResolve(t *testing.T) { func TestGetDBState(t *testing.T) { filePath := "../testdata/s_resolve.csv" want := uint32(1072108) - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() state, err := db.GetDBState() if err != nil { t.Error(err) @@ -343,11 +343,11 @@ func TestGetRepostedClaim(t *testing.T) { // Should be non-existent channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf") filePath := "../testdata/W_resolve.csv" - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() count, err := db.GetRepostedCount(channelHash) if err != nil { @@ -376,11 +376,11 @@ func TestGetRepostedCount(t *testing.T) { // Should be non-existent channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf") filePath := "../testdata/j_resolve.csv" - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() count, err := db.GetRepostedCount(channelHash) if err != nil { @@ -413,11 +413,11 @@ func TestGetRepost(t *testing.T) { channelHash2, _ := hex.DecodeString("000009ca6e0caaaef16872b4bd4f6f1b8c2363e2") filePath := "../testdata/V_resolve.csv" // want := uint32(3670) - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() res, err := db.GetRepost(channelHash) if err != nil { @@ -447,11 +447,12 @@ func TestGetClaimsInChannelCount(t *testing.T) { channelHash, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bd") filePath := "../testdata/Z_resolve.csv" want := uint32(3670) - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() + count, err := db.GetClaimsInChannelCount(channelHash) if err != nil { t.Error(err) @@ -476,11 +477,12 @@ func TestGetShortClaimIdUrl(t *testing.T) { var position uint16 = 0 filePath := "../testdata/F_resolve.csv" log.Println(filePath) - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() + shortUrl, err := db.GetShortClaimIdUrl(name, normalName, claimHash, rootTxNum, position) if err != nil { t.Error(err) @@ -494,11 +496,11 @@ func TestClaimShortIdIter(t *testing.T) { filePath := "../testdata/F_cat.csv" normalName := "cat" claimId := "0" - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() ch := db.ClaimShortIdIter(normalName, claimId) @@ -524,11 +526,12 @@ func TestGetTXOToClaim(t *testing.T) { var txNum uint32 = 1456296 var position uint16 = 0 filePath := "../testdata/G_2.csv" - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() + val, err := db.GetCachedClaimHash(txNum, position) if err != nil { t.Error(err) @@ -552,11 +555,11 @@ func TestGetClaimToChannel(t *testing.T) { var val []byte = nil filePath := "../testdata/I_resolve.csv" - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() val, err = db.GetChannelForClaim(claimHash, txNum, position) if err != nil { @@ -581,11 +584,11 @@ func TestGetEffectiveAmountSupportOnly(t *testing.T) { want := uint64(20000006) claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a" claimHash, _ := hex.DecodeString(claimHashStr) - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() db.Height = 999999999 amount, err := db.GetEffectiveAmount(claimHash, true) @@ -611,11 +614,12 @@ func TestGetEffectiveAmount(t *testing.T) { want := uint64(21000006) claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a" claimHash, _ := hex.DecodeString(claimHashStr) - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() + db.Height = 999999999 amount, err := db.GetEffectiveAmount(claimHash, false) @@ -648,11 +652,12 @@ func TestGetSupportAmount(t *testing.T) { t.Error(err) } filePath := "../testdata/a_resolve.csv" - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() + res, err := db.GetSupportAmount(claimHash) if err != nil { t.Error(err) @@ -668,11 +673,12 @@ func TestGetTxHash(t *testing.T) { want := "54e14ff0c404c29b3d39ae4d249435f167d5cd4ce5a428ecb745b3df1c8e3dde" filePath := "../testdata/X_resolve.csv" - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() + resHash, err := db.GetTxHash(txNum) if err != nil { t.Error(err) @@ -710,11 +716,12 @@ func TestGetActivation(t *testing.T) { txNum := uint32(0x6284e3) position := uint16(0x0) want := uint32(0xa6b65) - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() + activation, err := db.GetActivation(txNum, position) if err != nil { t.Error(err) @@ -741,12 +748,13 @@ func TestGetClaimToTXO(t *testing.T) { return } filePath := "../testdata/E_resolve.csv" - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) return } - defer toDefer() + res, err := db.GetCachedClaimTxo(claimHash, true) if err != nil { t.Error(err) @@ -770,12 +778,13 @@ func TestGetControllingClaim(t *testing.T) { claimName := internal.NormalizeName("@Styxhexenhammer666") claimHash := "2556ed1cab9d17f2a9392030a9ad7f5d138f11bd" filePath := "../testdata/P_resolve.csv" - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) return } - defer toDefer() + res, err := db.GetControllingClaim(claimName) if err != nil { t.Error(err) diff --git a/db/iteroptions.go b/db/iteroptions.go index 6d1ea7f..c4b2467 100644 --- a/db/iteroptions.go +++ b/db/iteroptions.go @@ -6,6 +6,7 @@ import ( "bytes" "github.com/lbryio/herald.go/db/prefixes" + "github.com/lbryio/lbry.go/v3/extras/stop" "github.com/linxGnu/grocksdb" log "github.com/sirupsen/logrus" @@ -22,12 +23,11 @@ type IterOptions struct { IncludeValue bool RawKey bool RawValue bool - ShutdownChan chan struct{} - DoneChan chan struct{} - DB *ReadOnlyDBColumnFamily - CfHandle *grocksdb.ColumnFamilyHandle - It *grocksdb.Iterator - Serializer *prefixes.SerializationAPI + Grp *stop.Group + // DB *ReadOnlyDBColumnFamily + CfHandle *grocksdb.ColumnFamilyHandle + It *grocksdb.Iterator + Serializer *prefixes.SerializationAPI } // NewIterateOptions creates a defualt options structure for a db iterator. @@ -43,12 +43,11 @@ func NewIterateOptions() *IterOptions { IncludeValue: false, RawKey: false, RawValue: false, - ShutdownChan: make(chan struct{}, 1), - DoneChan: make(chan struct{}, 1), - DB: nil, - CfHandle: nil, - It: nil, - Serializer: prefixes.ProductionAPI, + Grp: nil, + // DB: nil, + CfHandle: nil, + It: nil, + Serializer: prefixes.ProductionAPI, } } @@ -108,7 +107,9 @@ func (o *IterOptions) WithRawValue(rawValue bool) *IterOptions { } func (o *IterOptions) WithDB(db *ReadOnlyDBColumnFamily) *IterOptions { - o.DB = db + // o.Grp.AddNamed(1, iterKey) + o.Grp = stop.New(db.Grp) + o.Grp.Add(1) return o } diff --git a/go.mod b/go.mod index d938900..05819a8 100644 --- a/go.mod +++ b/go.mod @@ -24,9 +24,6 @@ require ( gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c ) -require ( -) - require ( github.com/beorn7/perks v1.0.1 // indirect github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect @@ -43,7 +40,7 @@ require ( github.com/prometheus/procfs v0.6.0 // indirect github.com/stretchr/testify v1.7.0 // indirect golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b // indirect - golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect + golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84 // indirect diff --git a/go.sum b/go.sum index 7593b5b..f8bc4ca 100644 --- a/go.sum +++ b/go.sum @@ -358,8 +358,6 @@ github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL github.com/lbryio/lbcd v0.22.100-beta/go.mod h1:u8SaFX4xdGMMR5xasBGfgApC8pvD4rnK2OujZnrq5gs= github.com/lbryio/lbcd v0.22.100-beta-rc5/go.mod h1:9PbFSlHYX7WlnDQwcTxHVf1W35VAnRsattCSyKOO55g= github.com/lbryio/lbcd v0.22.200-beta/go.mod h1:kNuzGWf808ipTGB0y0WogzsGv5BVM4Qv85Z+JYwC9FA= -github.com/lbryio/lbcd v0.22.201-beta-rc1 h1:FmzzApVj2RBXloLM2w9tLvN2xyTZjeyh+QC7GIw/wwo= -github.com/lbryio/lbcd v0.22.201-beta-rc1/go.mod h1:kNuzGWf808ipTGB0y0WogzsGv5BVM4Qv85Z+JYwC9FA= github.com/lbryio/lbcd v0.22.201-beta-rc4 h1:Xh751Bh/GWRcP5bI6NJ2+zueo2otTcTWapFvFbryP5c= github.com/lbryio/lbcd v0.22.201-beta-rc4/go.mod h1:Jgo48JDINhdOgHHR83J70Q6G42x3WAo9DI//QogcL+E= github.com/lbryio/lbcutil v1.0.201/go.mod h1:gDHc/b+Rdz3J7+VB8e5/Bl9roVf8Q5/8FQCyuK9dXD0= @@ -375,8 +373,6 @@ github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-b github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= github.com/linxGnu/grocksdb v1.6.42 h1:nJLoXFuzwBwQQQrXTUgRGRz1QRm7y8pR6CNV/gwrbqs= github.com/linxGnu/grocksdb v1.6.42/go.mod h1:JcMMDBFaDNhRXFYcYXmgQwb/RarSld1PulTI7UzE+w0= -github.com/linxGnu/grocksdb v1.7.0 h1:UyFDykX0CUfxDN10cqlFho/rwt9K6KoDaLXL9Ej5z9g= -github.com/linxGnu/grocksdb v1.7.0/go.mod h1:JcMMDBFaDNhRXFYcYXmgQwb/RarSld1PulTI7UzE+w0= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= github.com/lyoshenka/bencode v0.0.0-20180323155644-b7abd7672df5/go.mod h1:H0aPCWffGOaDcjkw1iB7W9DVLp6GXmfcJY/7YZCWPA4= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= diff --git a/main.go b/main.go index 1e42b45..e78be35 100644 --- a/main.go +++ b/main.go @@ -10,6 +10,7 @@ import ( "github.com/lbryio/herald.go/internal" pb "github.com/lbryio/herald.go/protobuf/go" "github.com/lbryio/herald.go/server" + "github.com/lbryio/lbry.go/v3/extras/stop" log "github.com/sirupsen/logrus" "google.golang.org/grpc" ) @@ -27,13 +28,16 @@ func main() { if args.CmdType == server.ServeCmd { // This will cancel goroutines with the server finishes. - ctxWCancel, cancel := context.WithCancel(ctx) - defer cancel() + // ctxWCancel, cancel := context.WithCancel(ctx) + // defer cancel() + stopGroup := stop.New() + // defer stopGroup.Stop() - initsignals() + initsignals(stopGroup.Ch()) interrupt := interruptListener() - s := server.MakeHubServer(ctxWCancel, args) + // s := server.MakeHubServer(ctxWCancel, args) + s := server.MakeHubServer(stopGroup, args) go s.Run() defer func() { diff --git a/scripts/integration_tests.sh b/scripts/integration_tests.sh index c9d1cc4..b24bf82 100755 --- a/scripts/integration_tests.sh +++ b/scripts/integration_tests.sh @@ -88,7 +88,7 @@ test_command_with_want ### blockchain.block.get_chunk read -r -d '' CMD <<- EOM - curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" + curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json" --data '{"id": 1, "method": "blockchain.block.get_chunk", "params": [0]}' | jq .result | sed 's/"//g' | head -c 100 EOM @@ -97,7 +97,7 @@ test_command_with_want ### blockchain.block.get_header read -r -d '' CMD <<- EOM - curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" + curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json" --data '{"id": 1, "method": "blockchain.block.get_header", "params": []}' | jq .result.timestamp EOM @@ -106,7 +106,7 @@ test_command_with_want ### blockchain.block.headers read -r -d '' CMD <<- EOM - curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" + curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json" --data '{"id": 1, "method": "blockchain.block.headers", "params": []}' | jq .result.count EOM @@ -116,7 +116,7 @@ test_command_with_want ## blockchain.claimtrie read -r -d '' CMD <<- EOM - curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" + curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json" --data '{"id": 1, "method": "blockchain.claimtrie.resolve", "params":[{"Data": ["@Styxhexenhammer666:2"]}]}' | jq .result.txos[0].tx_hash | sed 's/"//g' EOM @@ -128,7 +128,7 @@ test_command_with_want ### blockchain.address.get_balance read -r -d '' CMD <<- EOM - curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" + curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json" --data '{"id": 1, "method": "blockchain.address.get_balance", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}' | jq .result.confirmed EOM @@ -138,7 +138,7 @@ test_command_with_want ## blockchain.address.get_history read -r -d '' CMD <<- EOM - curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" + curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json" --data '{"id": 1, "method": "blockchain.address.get_history", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}' | jq '.result.confirmed | length' EOM @@ -148,7 +148,7 @@ test_command_with_want ## blockchain.address.listunspent read -r -d '' CMD <<- EOM - curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" + curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json" --data '{"id": 1, "method": "blockchain.address.listunspent", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}' | jq '.result | length' EOM @@ -160,7 +160,7 @@ test_command_with_want ## blockchain.scripthash.get_mempool read -r -d '' CMD <<- EOM - curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" + curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json" --data '{"id": 1, "method": "blockchain.scripthash.get_mempool", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}' | jq .error | sed 's/"//g' EOM @@ -170,7 +170,7 @@ test_command_with_want ## blockchain.scripthash.get_history read -r -d '' CMD <<- EOM - curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" + curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json" --data '{"id": 1, "method": "blockchain.scripthash.get_history", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}' | jq .error | sed 's/"//g' EOM @@ -180,13 +180,42 @@ test_command_with_want ## blockchain.scripthash.listunspent read -r -d '' CMD <<- EOM - curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" + curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json" --data '{"id": 1, "method": "blockchain.scripthash.listunspent", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}' | jq .error | sed 's/"//g' EOM WANT="encoding/hex: invalid byte: U+0047 'G'" test_command_with_want +## server.banner + +read -r -d '' CMD <<- EOM + curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json" + --data '{"id": 1, "method": "server.banner", "params":[]}' + | jq .result | sed 's/"//g' +EOM +WANT="You are connected to an 0.107.0 server." +test_command_with_want + +## server.version + +read -r -d '' CMD <<- EOM + curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json" + --data '{"id": 1, "method": "server.version", "params":[]}' + | jq .result | sed 's/"//g' +EOM +WANT="0.107.0" +test_command_with_want + +## server.features + +read -r -d '' CMD <<- EOM + curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json" + --data '{"id": 1, "method": "server.features", "params":[]}' +EOM +WANT='{"result":{"hosts":{},"pruning":"","server_version":"0.107.0","protocol_min":"0.54.0","protocol_max":"0.199.0","genesis_hash":"9c89283ba0f3227f6c03b70216b9f665f0118d5e0fa729cedf4fb34d6a34f463","description":"Herald","payment_address":"","donation_address":"","daily_fee":"1.0","hash_function":"sha256","trending_algorithm":"fast_ar"},"error":null,"id":1}' +test_command_with_want + # metrics endpoint testing WANT=0 diff --git a/server/args.go b/server/args.go index 95ecce0..1f6438c 100644 --- a/server/args.go +++ b/server/args.go @@ -1,6 +1,7 @@ package server import ( + "fmt" "log" "os" "strconv" @@ -19,26 +20,37 @@ const ( // Args struct contains the arguments to the hub server. type Args struct { - CmdType int - Host string - Port string - DBPath string - Chain *string - EsHost string - EsPort string - PrometheusPort string - NotifierPort string - JSONRPCPort int - JSONRPCHTTPPort int - MaxSessions int - SessionTimeout int - EsIndex string - RefreshDelta int - CacheTTL int - PeerFile string - Country string - BlockingChannelIds []string - FilteringChannelIds []string + CmdType int + Host string + Port string + DBPath string + Chain *string + EsHost string + EsPort string + PrometheusPort string + NotifierPort string + JSONRPCPort int + JSONRPCHTTPPort int + MaxSessions int + SessionTimeout int + EsIndex string + RefreshDelta int + CacheTTL int + PeerFile string + Banner *string + Country string + BlockingChannelIds []string + FilteringChannelIds []string + + GenesisHash string + ServerVersion string + ProtocolMin string + ProtocolMax string + ServerDescription string + PaymentAddress string + DonationAddress string + DailyFee string + Debug bool DisableEs bool DisableLoadPeers bool @@ -54,21 +66,32 @@ type Args struct { } const ( - DefaultHost = "0.0.0.0" - DefaultPort = "50051" - DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME - DefaultEsHost = "http://localhost" - DefaultEsIndex = "claims" - DefaultEsPort = "9200" - DefaultPrometheusPort = "2112" - DefaultNotifierPort = "18080" - DefaultJSONRPCPort = 50001 - DefaultMaxSessions = 10000 - DefaultSessionTimeout = 300 - DefaultRefreshDelta = 5 - DefaultCacheTTL = 5 - DefaultPeerFile = "peers.txt" - DefaultCountry = "US" + DefaultHost = "0.0.0.0" + DefaultPort = "50051" + DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME + DefaultEsHost = "http://localhost" + DefaultEsIndex = "claims" + DefaultEsPort = "9200" + DefaultPrometheusPort = "2112" + DefaultNotifierPort = "18080" + DefaultJSONRPCPort = 50001 + DefaultJSONRPCHTTPPort = 50002 + DefaultMaxSessions = 10000 + DefaultSessionTimeout = 300 + DefaultRefreshDelta = 5 + DefaultCacheTTL = 5 + DefaultPeerFile = "peers.txt" + DefaultBannerFile = "" + DefaultCountry = "US" + + HUB_PROTOCOL_VERSION = "0.107.0" + PROTOCOL_MIN = "0.54.0" + PROTOCOL_MAX = "0.199.0" + DefaultServerDescription = "Herald" + DefaultPaymentAddress = "" + DefaultDonationAddress = "" + DefaultDailyFee = "1.0" + DefaultDisableLoadPeers = false DefaultDisableStartPrometheus = false DefaultDisableStartUDP = false @@ -86,6 +109,73 @@ var ( DefaultFilteringChannelIds = []string{} ) +func loadBanner(bannerFile *string, serverVersion string) *string { + var banner string + + data, err := os.ReadFile(*bannerFile) + if err != nil { + banner = fmt.Sprintf("You are connected to an %s server.", serverVersion) + } else { + banner = string(data) + } + + /* + banner := os.Getenv("BANNER") + if banner == "" { + return nil + } + */ + + return &banner +} + +// MakeDefaultArgs creates a default set of arguments for testing the server. +func MakeDefaultTestArgs() *Args { + args := &Args{ + CmdType: ServeCmd, + Host: DefaultHost, + Port: DefaultPort, + DBPath: DefaultDBPath, + EsHost: DefaultEsHost, + EsPort: DefaultEsPort, + PrometheusPort: DefaultPrometheusPort, + NotifierPort: DefaultNotifierPort, + JSONRPCPort: DefaultJSONRPCPort, + JSONRPCHTTPPort: DefaultJSONRPCHTTPPort, + MaxSessions: DefaultMaxSessions, + SessionTimeout: DefaultSessionTimeout, + EsIndex: DefaultEsIndex, + RefreshDelta: DefaultRefreshDelta, + CacheTTL: DefaultCacheTTL, + PeerFile: DefaultPeerFile, + Banner: nil, + Country: DefaultCountry, + + GenesisHash: chaincfg.TestNet3Params.GenesisHash.String(), + ServerVersion: HUB_PROTOCOL_VERSION, + ProtocolMin: PROTOCOL_MIN, + ProtocolMax: PROTOCOL_MAX, + ServerDescription: DefaultServerDescription, + PaymentAddress: DefaultPaymentAddress, + DonationAddress: DefaultDonationAddress, + DailyFee: DefaultDailyFee, + + DisableEs: true, + Debug: true, + DisableLoadPeers: true, + DisableStartPrometheus: true, + DisableStartUDP: true, + DisableWritePeers: true, + DisableRocksDBRefresh: true, + DisableResolve: true, + DisableBlockingAndFiltering: true, + DisableStartNotifier: true, + DisableStartJSONRPC: true, + } + + return args +} + // GetEnvironment takes the environment variables as an array of strings // and a getkeyval function to turn it into a map. func GetEnvironment(data []string, getkeyval func(item string) (key, val string)) map[string]string { @@ -111,7 +201,7 @@ func GetEnvironmentStandard() map[string]string { func ParseArgs(searchRequest *pb.SearchRequest) *Args { environment := GetEnvironmentStandard() - parser := argparse.NewParser("hub", "hub server and client") + parser := argparse.NewParser("herald", "herald server and client") serveCmd := parser.NewCommand("serve", "start the hub server") searchCmd := parser.NewCommand("search", "claim search") @@ -122,6 +212,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { return err } + // main server config arguments host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost}) port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort}) dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath}) @@ -131,18 +222,26 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort}) prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort}) notifierPort := parser.String("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort}) - jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort}) - jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort}) + jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort, Default: DefaultJSONRPCPort}) + jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort, Default: DefaultJSONRPCHTTPPort}) maxSessions := parser.Int("", "max-sessions", &argparse.Options{Required: false, Help: "Maximum number of electrum clients that can be connected", Default: DefaultMaxSessions}) sessionTimeout := parser.Int("", "session-timeout", &argparse.Options{Required: false, Help: "Session inactivity timeout (seconds)", Default: DefaultSessionTimeout}) esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: DefaultEsIndex}) refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: DefaultRefreshDelta}) cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: DefaultCacheTTL}) peerFile := parser.String("", "peerfile", &argparse.Options{Required: false, Help: "Initial peer file for federation", Default: DefaultPeerFile}) + bannerFile := parser.String("", "bannerfile", &argparse.Options{Required: false, Help: "Banner file server.banner", Default: DefaultBannerFile}) country := parser.String("", "country", &argparse.Options{Required: false, Help: "Country this node is running in. Default US.", Default: DefaultCountry}) blockingChannelIds := parser.StringList("", "blocking-channel-ids", &argparse.Options{Required: false, Help: "Blocking channel ids", Default: DefaultBlockingChannelIds}) filteringChannelIds := parser.StringList("", "filtering-channel-ids", &argparse.Options{Required: false, Help: "Filtering channel ids", Default: DefaultFilteringChannelIds}) + // arguments for server features + serverDescription := parser.String("", "server-description", &argparse.Options{Required: false, Help: "Server description", Default: DefaultServerDescription}) + paymentAddress := parser.String("", "payment-address", &argparse.Options{Required: false, Help: "Payment address", Default: DefaultPaymentAddress}) + donationAddress := parser.String("", "donation-address", &argparse.Options{Required: false, Help: "Donation address", Default: DefaultDonationAddress}) + dailyFee := parser.String("", "daily-fee", &argparse.Options{Required: false, Help: "Daily fee", Default: DefaultDailyFee}) + + // flags for disabling features debug := parser.Flag("", "debug", &argparse.Options{Required: false, Help: "enable debug logging", Default: false}) disableEs := parser.Flag("", "disable-es", &argparse.Options{Required: false, Help: "Disable elastic search, for running/testing independently", Default: false}) disableLoadPeers := parser.Flag("", "disable-load-peers", &argparse.Options{Required: false, Help: "Disable load peers from disk at startup", Default: DefaultDisableLoadPeers}) @@ -156,6 +255,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { disableStartNotifier := parser.Flag("", "disable-start-notifier", &argparse.Options{Required: false, Help: "Disable start notifier", Default: DisableStartNotifier}) disableStartJSONRPC := parser.Flag("", "disable-start-jsonrpc", &argparse.Options{Required: false, Help: "Disable start jsonrpc endpoint", Default: DisableStartJSONRPC}) + // search command arguments text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"}) name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"}) claimType := parser.String("", "claim_type", &argparse.Options{Required: false, Help: "claim_type"}) @@ -177,27 +277,40 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { *jsonRPCPort = DefaultJSONRPCPort } + banner := loadBanner(bannerFile, HUB_PROTOCOL_VERSION) + args := &Args{ - CmdType: SearchCmd, - Host: *host, - Port: *port, - DBPath: *dbPath, - Chain: chain, - EsHost: *esHost, - EsPort: *esPort, - PrometheusPort: *prometheusPort, - NotifierPort: *notifierPort, - JSONRPCPort: *jsonRPCPort, - JSONRPCHTTPPort: *jsonRPCHTTPPort, - MaxSessions: *maxSessions, - SessionTimeout: *sessionTimeout, - EsIndex: *esIndex, - RefreshDelta: *refreshDelta, - CacheTTL: *cacheTTL, - PeerFile: *peerFile, - Country: *country, - BlockingChannelIds: *blockingChannelIds, - FilteringChannelIds: *filteringChannelIds, + CmdType: SearchCmd, + Host: *host, + Port: *port, + DBPath: *dbPath, + Chain: chain, + EsHost: *esHost, + EsPort: *esPort, + PrometheusPort: *prometheusPort, + NotifierPort: *notifierPort, + JSONRPCPort: *jsonRPCPort, + JSONRPCHTTPPort: *jsonRPCHTTPPort, + MaxSessions: *maxSessions, + SessionTimeout: *sessionTimeout, + EsIndex: *esIndex, + RefreshDelta: *refreshDelta, + CacheTTL: *cacheTTL, + PeerFile: *peerFile, + Banner: banner, + Country: *country, + BlockingChannelIds: *blockingChannelIds, + FilteringChannelIds: *filteringChannelIds, + + GenesisHash: "", + ServerVersion: HUB_PROTOCOL_VERSION, + ProtocolMin: PROTOCOL_MIN, + ProtocolMax: PROTOCOL_MAX, + ServerDescription: *serverDescription, + PaymentAddress: *paymentAddress, + DonationAddress: *donationAddress, + DailyFee: *dailyFee, + Debug: *debug, DisableEs: *disableEs, DisableLoadPeers: *disableLoadPeers, diff --git a/server/federation_test.go b/server/federation_test.go index 850500f..743c849 100644 --- a/server/federation_test.go +++ b/server/federation_test.go @@ -12,7 +12,8 @@ import ( "github.com/lbryio/herald.go/internal/metrics" pb "github.com/lbryio/herald.go/protobuf/go" - server "github.com/lbryio/herald.go/server" + "github.com/lbryio/herald.go/server" + "github.com/lbryio/lbry.go/v3/extras/stop" dto "github.com/prometheus/client_model/go" "google.golang.org/grpc" ) @@ -44,43 +45,11 @@ func removeFile(fileName string) { } } -// makeDefaultArgs creates a default set of arguments for testing the server. -func makeDefaultArgs() *server.Args { - args := &server.Args{ - CmdType: server.ServeCmd, - Host: server.DefaultHost, - Port: server.DefaultPort, - DBPath: server.DefaultDBPath, - EsHost: server.DefaultEsHost, - EsPort: server.DefaultEsPort, - PrometheusPort: server.DefaultPrometheusPort, - NotifierPort: server.DefaultNotifierPort, - JSONRPCPort: server.DefaultJSONRPCPort, - EsIndex: server.DefaultEsIndex, - RefreshDelta: server.DefaultRefreshDelta, - CacheTTL: server.DefaultCacheTTL, - PeerFile: server.DefaultPeerFile, - Country: server.DefaultCountry, - DisableEs: true, - Debug: true, - DisableLoadPeers: true, - DisableStartPrometheus: true, - DisableStartUDP: true, - DisableWritePeers: true, - DisableRocksDBRefresh: true, - DisableResolve: true, - DisableBlockingAndFiltering: true, - DisableStartNotifier: true, - DisableStartJSONRPC: true, - } - - return args -} - // TestAddPeer tests the ability to add peers func TestAddPeer(t *testing.T) { - ctx := context.Background() - args := makeDefaultArgs() + // ctx := context.Background() + ctx := stop.NewDebug() + args := server.MakeDefaultTestArgs() tests := []struct { name string @@ -137,8 +106,9 @@ func TestAddPeer(t *testing.T) { // TestPeerWriter tests that peers get written properly func TestPeerWriter(t *testing.T) { - ctx := context.Background() - args := makeDefaultArgs() + // ctx := context.Background() + ctx := stop.NewDebug() + args := server.MakeDefaultTestArgs() args.DisableWritePeers = false tests := []struct { @@ -193,9 +163,10 @@ func TestPeerWriter(t *testing.T) { // TestAddPeerEndpoint tests the ability to add peers func TestAddPeerEndpoint(t *testing.T) { - ctx := context.Background() - args := makeDefaultArgs() - args2 := makeDefaultArgs() + // ctx := context.Background() + ctx := stop.NewDebug() + args := server.MakeDefaultTestArgs() + args2 := server.MakeDefaultTestArgs() args2.Port = "50052" tests := []struct { @@ -264,10 +235,11 @@ func TestAddPeerEndpoint(t *testing.T) { // TestAddPeerEndpoint2 tests the ability to add peers func TestAddPeerEndpoint2(t *testing.T) { - ctx := context.Background() - args := makeDefaultArgs() - args2 := makeDefaultArgs() - args3 := makeDefaultArgs() + // ctx := context.Background() + ctx := stop.NewDebug() + args := server.MakeDefaultTestArgs() + args2 := server.MakeDefaultTestArgs() + args3 := server.MakeDefaultTestArgs() args2.Port = "50052" args3.Port = "50053" @@ -345,10 +317,11 @@ func TestAddPeerEndpoint2(t *testing.T) { // TestAddPeerEndpoint3 tests the ability to add peers func TestAddPeerEndpoint3(t *testing.T) { - ctx := context.Background() - args := makeDefaultArgs() - args2 := makeDefaultArgs() - args3 := makeDefaultArgs() + // ctx := context.Background() + ctx := stop.NewDebug() + args := server.MakeDefaultTestArgs() + args2 := server.MakeDefaultTestArgs() + args3 := server.MakeDefaultTestArgs() args2.Port = "50052" args3.Port = "50053" @@ -434,10 +407,11 @@ func TestAddPeerEndpoint3(t *testing.T) { // TestAddPeer tests the ability to add peers func TestUDPServer(t *testing.T) { - ctx := context.Background() - args := makeDefaultArgs() + // ctx := context.Background() + ctx := stop.NewDebug() + args := server.MakeDefaultTestArgs() args.DisableStartUDP = false - args2 := makeDefaultArgs() + args2 := server.MakeDefaultTestArgs() args2.Port = "50052" args2.DisableStartUDP = false diff --git a/server/jsonrpc_blockchain.go b/server/jsonrpc_blockchain.go index 02be53b..64ce66c 100644 --- a/server/jsonrpc_blockchain.go +++ b/server/jsonrpc_blockchain.go @@ -105,6 +105,7 @@ func newBlockHeaderElectrum(header *[HEADER_SIZE]byte, height uint32) *BlockHead type BlockGetServerHeightReq struct{} type BlockGetServerHeightResp uint32 +// blockchain.block.get_server_height func (s *BlockchainBlockService) Get_server_height(req *BlockGetServerHeightReq, resp **BlockGetServerHeightResp) error { if s.DB == nil || s.DB.LastState == nil { return fmt.Errorf("unknown height") diff --git a/server/jsonrpc_blockchain_test.go b/server/jsonrpc_blockchain_test.go index 2c94ebb..aab7678 100644 --- a/server/jsonrpc_blockchain_test.go +++ b/server/jsonrpc_blockchain_test.go @@ -13,6 +13,7 @@ import ( "github.com/lbryio/lbcd/chaincfg" "github.com/lbryio/lbcd/txscript" "github.com/lbryio/lbcutil" + "github.com/lbryio/lbry.go/v3/extras/stop" ) // Source: test_variety_of_transactions_and_longish_history (lbry-sdk/tests/integration/transactions) @@ -57,8 +58,9 @@ var regTestAddrs = [30]string{ func TestServerGetHeight(t *testing.T) { secondaryPath := "asdf" - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) - defer toDefer() + grp := stop.NewDebug() + db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) + defer db.Shutdown() if err != nil { t.Error(err) return @@ -87,8 +89,9 @@ func TestServerGetHeight(t *testing.T) { func TestGetChunk(t *testing.T) { secondaryPath := "asdf" - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) - defer toDefer() + grp := stop.NewDebug() + db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) + defer db.Shutdown() if err != nil { t.Error(err) return @@ -130,8 +133,9 @@ func TestGetChunk(t *testing.T) { func TestGetHeader(t *testing.T) { secondaryPath := "asdf" - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) - defer toDefer() + grp := stop.NewDebug() + db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) + defer db.Shutdown() if err != nil { t.Error(err) return @@ -159,8 +163,9 @@ func TestGetHeader(t *testing.T) { func TestHeaders(t *testing.T) { secondaryPath := "asdf" - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) - defer toDefer() + grp := stop.NewDebug() + db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) + defer db.Shutdown() if err != nil { t.Error(err) return @@ -189,15 +194,17 @@ func TestHeaders(t *testing.T) { } func TestHeadersSubscribe(t *testing.T) { + args := MakeDefaultTestArgs() + grp := stop.NewDebug() secondaryPath := "asdf" - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) - defer toDefer() + db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) + defer db.Shutdown() if err != nil { t.Error(err) return } - sm := newSessionManager(db, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout) + sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams) sm.start() defer sm.stop() @@ -209,13 +216,13 @@ func TestHeadersSubscribe(t *testing.T) { // Set up logic to read a notification. var received sync.WaitGroup recv := func(client net.Conn) { + defer received.Done() buf := make([]byte, 1024) len, err := client.Read(buf) if err != nil { t.Errorf("read err: %v", err) } t.Logf("len: %v notification: %v", len, string(buf)) - received.Done() } received.Add(2) go recv(client1) @@ -281,8 +288,9 @@ func TestHeadersSubscribe(t *testing.T) { func TestGetBalance(t *testing.T) { secondaryPath := "asdf" - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) - defer toDefer() + grp := stop.NewDebug() + db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) + defer db.Shutdown() if err != nil { t.Error(err) return @@ -310,8 +318,9 @@ func TestGetBalance(t *testing.T) { func TestGetHistory(t *testing.T) { secondaryPath := "asdf" - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) - defer toDefer() + grp := stop.NewDebug() + db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) + defer db.Shutdown() if err != nil { t.Error(err) return @@ -339,8 +348,9 @@ func TestGetHistory(t *testing.T) { func TestListUnspent(t *testing.T) { secondaryPath := "asdf" - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) - defer toDefer() + grp := stop.NewDebug() + db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) + defer db.Shutdown() if err != nil { t.Error(err) return @@ -367,15 +377,17 @@ func TestListUnspent(t *testing.T) { } func TestAddressSubscribe(t *testing.T) { + args := MakeDefaultTestArgs() + grp := stop.NewDebug() secondaryPath := "asdf" - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) - defer toDefer() + db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) + defer db.Shutdown() if err != nil { t.Error(err) return } - sm := newSessionManager(db, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout) + sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams) sm.start() defer sm.stop() diff --git a/server/jsonrpc_server.go b/server/jsonrpc_server.go new file mode 100644 index 0000000..a818966 --- /dev/null +++ b/server/jsonrpc_server.go @@ -0,0 +1,89 @@ +package server + +import ( + log "github.com/sirupsen/logrus" +) + +type ServerService struct { + Args *Args +} + +type ServerFeatureService struct { + Args *Args +} + +type ServerFeaturesReq struct{} + +type ServerFeaturesRes struct { + Hosts map[string]string `json:"hosts"` + Pruning string `json:"pruning"` + ServerVersion string `json:"server_version"` + ProtocolMin string `json:"protocol_min"` + ProtocolMax string `json:"protocol_max"` + GenesisHash string `json:"genesis_hash"` + Description string `json:"description"` + PaymentAddress string `json:"payment_address"` + DonationAddress string `json:"donation_address"` + DailyFee string `json:"daily_fee"` + HashFunction string `json:"hash_function"` + TrendingAlgorithm string `json:"trending_algorithm"` +} + +// Features is the json rpc endpoint for 'server.features'. +func (t *ServerService) Features(req *ServerFeaturesReq, res **ServerFeaturesRes) error { + log.Println("Features") + + features := &ServerFeaturesRes{ + Hosts: map[string]string{}, + Pruning: "", + ServerVersion: HUB_PROTOCOL_VERSION, + ProtocolMin: PROTOCOL_MIN, + ProtocolMax: PROTOCOL_MAX, + GenesisHash: t.Args.GenesisHash, + Description: t.Args.ServerDescription, + PaymentAddress: t.Args.PaymentAddress, + DonationAddress: t.Args.DonationAddress, + DailyFee: t.Args.DailyFee, + HashFunction: "sha256", + TrendingAlgorithm: "fast_ar", + } + *res = features + + return nil +} + +type ServerBannerService struct { + Args *Args +} + +type ServerBannerReq struct{} + +type ServerBannerRes string + +// Banner is the json rpc endpoint for 'server.banner'. +func (t *ServerService) Banner(req *ServerBannerReq, res **ServerBannerRes) error { + log.Println("Banner") + + *res = (*ServerBannerRes)(t.Args.Banner) + + return nil +} + +type ServerVersionService struct { + Args *Args +} + +type ServerVersionReq struct{} + +type ServerVersionRes string + +// Banner is the json rpc endpoint for 'server.version'. +// FIXME: This should return a struct with the version and the protocol version. +// <<-- that comment was written by github, scary shit because it's true +func (t *ServerService) Version(req *ServerVersionReq, res **ServerVersionRes) error { + log.Println("Version") + + *res = (*ServerVersionRes)(&t.Args.ServerVersion) + + return nil +} diff --git a/server/jsonrpc_service.go b/server/jsonrpc_service.go index df12f3d..ae0d757 100644 --- a/server/jsonrpc_service.go +++ b/server/jsonrpc_service.go @@ -121,6 +121,14 @@ fail1: goto fail2 } + // Register "server.{features,banner,version}" handlers. + serverSvc := &ServerService{s.Args} + err = s1.RegisterTCPService(serverSvc, "server") + if err != nil { + log.Errorf("RegisterTCPService: %v\n", err) + goto fail2 + } + r := gorilla_mux.NewRouter() r.Handle("/rpc", s1) port := ":" + strconv.FormatUint(uint64(s.Args.JSONRPCHTTPPort), 10) diff --git a/server/notifier_test.go b/server/notifier_test.go index 3e820cc..d482b60 100644 --- a/server/notifier_test.go +++ b/server/notifier_test.go @@ -1,7 +1,6 @@ package server_test import ( - "context" "encoding/hex" "fmt" "net" @@ -10,6 +9,7 @@ import ( "github.com/lbryio/herald.go/internal" "github.com/lbryio/herald.go/server" + "github.com/lbryio/lbry.go/v3/extras/stop" "github.com/sirupsen/logrus" ) @@ -47,8 +47,9 @@ func tcpRead(conn net.Conn) ([]byte, error) { } func TestNotifierServer(t *testing.T) { - args := makeDefaultArgs() - ctx := context.Background() + args := server.MakeDefaultTestArgs() + // ctx := context.Background() + ctx := stop.NewDebug() hub := server.MakeHubServer(ctx, args) go hub.NotifierServer() diff --git a/server/search_test.go b/server/search_test.go index 726746c..cd5a50a 100644 --- a/server/search_test.go +++ b/server/search_test.go @@ -11,6 +11,7 @@ import ( pb "github.com/lbryio/herald.go/protobuf/go" server "github.com/lbryio/herald.go/server" + "github.com/lbryio/lbry.go/v3/extras/stop" "github.com/olivere/elastic/v7" ) @@ -55,13 +56,14 @@ func TestSearch(t *testing.T) { w.Write([]byte(resp)) } - context := context.Background() - args := makeDefaultArgs() - hubServer := server.MakeHubServer(context, args) + ctx := context.Background() + stopGroup := stop.NewDebug() + args := server.MakeDefaultTestArgs() + hubServer := server.MakeHubServer(stopGroup, args) req := &pb.SearchRequest{ Text: "asdf", } - out, err := hubServer.Search(context, req) + out, err := hubServer.Search(ctx, req) if err != nil { log.Println(err) } diff --git a/server/server.go b/server/server.go index 4bb1bb6..44f4bdf 100644 --- a/server/server.go +++ b/server/server.go @@ -22,6 +22,7 @@ import ( "github.com/lbryio/herald.go/meta" pb "github.com/lbryio/herald.go/protobuf/go" "github.com/lbryio/lbcd/chaincfg" + "github.com/lbryio/lbry.go/v3/extras/stop" "github.com/olivere/elastic/v7" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -53,6 +54,7 @@ type Server struct { HeightSubs map[net.Addr]net.Conn HeightSubsMut sync.RWMutex NotifierChan chan interface{} + Grp *stop.Group sessionManager *sessionManager pb.UnimplementedHubServer } @@ -149,7 +151,7 @@ func (s *Server) Run() { } } -func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) { +func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, error) { tmpName, err := ioutil.TempDir("", "go-lbry-hub") if err != nil { logrus.Info(err) @@ -159,10 +161,7 @@ func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) { if err != nil { logrus.Info(err) } - myDB, _, err := db.GetProdDB(args.DBPath, tmpName) - // dbShutdown = func() { - // db.Shutdown(myDB) - // } + myDB, err := db.GetProdDB(args.DBPath, tmpName, grp) if err != nil { // Can't load the db, fail loudly logrus.Info(err) @@ -217,7 +216,7 @@ func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) { // MakeHubServer takes the arguments given to a hub when it's started and // initializes everything. It loads information about previously known peers, // creates needed internal data structures, and initializes goroutines. -func MakeHubServer(ctx context.Context, args *Args) *Server { +func MakeHubServer(grp *stop.Group, args *Args) *Server { grpcServer := grpc.NewServer(grpc.NumStreamWorkers(0)) multiSpaceRe, err := regexp.Compile(`\s{2,}`) @@ -266,14 +265,14 @@ func MakeHubServer(ctx context.Context, args *Args) *Server { //TODO: is this the right place to load the db? var myDB *db.ReadOnlyDBColumnFamily - // var dbShutdown = func() {} if !args.DisableResolve { - myDB, err = LoadDatabase(args) + myDB, err = LoadDatabase(args, grp) if err != nil { logrus.Warning(err) } } + // Determine which chain to use based on db and cli values dbChain := (*chaincfg.Params)(nil) if myDB != nil && myDB.LastState != nil && myDB.LastState.Genesis != nil { // The chain params can be inferred from DBStateValue. @@ -310,6 +309,10 @@ func MakeHubServer(ctx context.Context, args *Args) *Server { } logrus.Infof("network: %v", chain.Name) + args.GenesisHash = chain.GenesisHash.String() + + sessionGrp := stop.New(grp) + s := &Server{ GrpcServer: grpcServer, Args: args, @@ -333,7 +336,8 @@ func MakeHubServer(ctx context.Context, args *Args) *Server { HeightSubs: make(map[net.Addr]net.Conn), HeightSubsMut: sync.RWMutex{}, NotifierChan: make(chan interface{}), - sessionManager: newSessionManager(myDB, &chain, args.MaxSessions, args.SessionTimeout), + Grp: grp, + sessionManager: newSessionManager(myDB, args, sessionGrp, &chain), } // Start up our background services diff --git a/server/server_test_pkg.go b/server/server_test_pkg.go index 631cb9d..31230c7 100644 --- a/server/server_test_pkg.go +++ b/server/server_test_pkg.go @@ -1,5 +1,11 @@ package server +import ( + "github.com/lbryio/herald.go/db" + "github.com/lbryio/lbcd/chaincfg" + "github.com/lbryio/lbry.go/v3/extras/stop" +) + func (s *Server) AddPeerExported() func(*Peer, bool, bool) error { return s.addPeer } @@ -7,3 +13,7 @@ func (s *Server) AddPeerExported() func(*Peer, bool, bool) error { func (s *Server) GetNumPeersExported() func() int64 { return s.getNumPeers } + +func NewSessionManagerExported(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager { + return newSessionManager(db, args, grp, chain) +} diff --git a/server/session.go b/server/session.go index 9b9fbb7..5caf06b 100644 --- a/server/session.go +++ b/server/session.go @@ -14,6 +14,7 @@ import ( "github.com/lbryio/herald.go/db" "github.com/lbryio/herald.go/internal" "github.com/lbryio/lbcd/chaincfg" + "github.com/lbryio/lbry.go/v3/extras/stop" log "github.com/sirupsen/logrus" ) @@ -114,13 +115,15 @@ type sessionMap map[uintptr]*session type sessionManager struct { // sessionsMut protects sessions, headerSubs, hashXSubs state - sessionsMut sync.RWMutex - sessions sessionMap - sessionsWait sync.WaitGroup + sessionsMut sync.RWMutex + sessions sessionMap + // sessionsWait sync.WaitGroup + grp *stop.Group sessionsMax int sessionTimeout time.Duration manageTicker *time.Ticker db *db.ReadOnlyDBColumnFamily + args *Args chain *chaincfg.Params // headerSubs are sessions subscribed via 'blockchain.headers.subscribe' headerSubs sessionMap @@ -128,13 +131,15 @@ type sessionManager struct { hashXSubs map[[HASHX_LEN]byte]sessionMap } -func newSessionManager(db *db.ReadOnlyDBColumnFamily, chain *chaincfg.Params, sessionsMax, sessionTimeout int) *sessionManager { +func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager { return &sessionManager{ sessions: make(sessionMap), - sessionsMax: sessionsMax, - sessionTimeout: time.Duration(sessionTimeout) * time.Second, - manageTicker: time.NewTicker(time.Duration(max(5, sessionTimeout/20)) * time.Second), + grp: grp, + sessionsMax: args.MaxSessions, + sessionTimeout: time.Duration(args.SessionTimeout) * time.Second, + manageTicker: time.NewTicker(time.Duration(max(5, args.SessionTimeout/20)) * time.Second), db: db, + args: args, chain: chain, headerSubs: make(sessionMap), hashXSubs: make(map[[HASHX_LEN]byte]sessionMap), @@ -142,6 +147,7 @@ func newSessionManager(db *db.ReadOnlyDBColumnFamily, chain *chaincfg.Params, se } func (sm *sessionManager) start() { + sm.grp.Add(1) go sm.manage() } @@ -168,7 +174,13 @@ func (sm *sessionManager) manage() { } sm.sessionsMut.Unlock() // Wait for next management clock tick. - <-sm.manageTicker.C + select { + case <-sm.grp.Ch(): + sm.grp.Done() + return + case <-sm.manageTicker.C: + continue + } } } @@ -190,11 +202,18 @@ func (sm *sessionManager) addSession(conn net.Conn) *session { // each request and update subscriptions. s1 := rpc.NewServer() + // Register "server.{features,banner,version}" handlers. + serverSvc := &ServerService{sm.args} + err := s1.RegisterName("server", serverSvc) + if err != nil { + log.Errorf("RegisterName: %v\n", err) + } + // Register "blockchain.claimtrie.*"" handlers. claimtrieSvc := &ClaimtrieService{sm.db} - err := s1.RegisterName("blockchain.claimtrie", claimtrieSvc) + err = s1.RegisterName("blockchain.claimtrie", claimtrieSvc) if err != nil { - log.Errorf("RegisterService: %v\n", err) + log.Errorf("RegisterName: %v\n", err) } // Register other "blockchain.{block,address,scripthash}.*" handlers. @@ -220,11 +239,11 @@ func (sm *sessionManager) addSession(conn net.Conn) *session { goto fail } - sm.sessionsWait.Add(1) + sm.grp.Add(1) go func() { s1.ServeCodec(&SessionServerCodec{jsonrpc.NewServerCodec(conn), sess}) log.Infof("session %v goroutine exit", sess.addr.String()) - sm.sessionsWait.Done() + sm.grp.Done() }() return sess diff --git a/server/udp_test.go b/server/udp_test.go index 5f80f82..37f9b90 100644 --- a/server/udp_test.go +++ b/server/udp_test.go @@ -11,7 +11,7 @@ import ( // TestUDPPing tests UDPPing correctness against prod server. func TestUDPPing(t *testing.T) { - args := makeDefaultArgs() + args := server.MakeDefaultTestArgs() args.DisableStartUDP = true tests := []struct { diff --git a/signal.go b/signal.go index 7eab8a5..a2561a6 100644 --- a/signal.go +++ b/signal.go @@ -8,12 +8,13 @@ import ( "os" "os/signal" + "github.com/lbryio/lbry.go/v3/extras/stop" log "github.com/sirupsen/logrus" ) // shutdownRequestChannel is used to initiate shutdown from one of the // subsystems using the same code paths as when an interrupt signal is received. -var shutdownRequestChannel = make(chan struct{}) +var shutdownRequestChannel = make(stop.Chan) // interruptSignals defines the default signals to catch in order to do a proper // shutdown. This may be modified during init depending on the platform. diff --git a/signalsigterm.go b/signalsigterm.go index dbd9750..c9435fc 100644 --- a/signalsigterm.go +++ b/signalsigterm.go @@ -10,9 +10,12 @@ package main import ( "os" "syscall" + + "github.com/lbryio/lbry.go/v3/extras/stop" ) // initsignals sets the signals to be caught by the signal handler -func initsignals() { +func initsignals(stopCh stop.Chan) { + shutdownRequestChannel = stopCh interruptSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} }