Server endpoints goroutine refactor (#69)

* server.xxx endpoints

Additional server endpoints in jsonrpc and also some refactoring

* server.banner

* more endpoints

* use lbry.go stop pattern

* set genesis hash properly

* updates and refactors

* remove shutdowncalled and itmut

* remove OpenIterators

* remove shutdown and done channels from db and use stop group

* currently broken, incorporated stop groups into the session manager

* set the rest of the default args for tests

* add default json rpc http port and cleanup

* tests for new jsonrpc endpoints

* cleanup and add manage goroutine to stopper pattern

* cleanup

* NewDebug

* asdf

* fix
This commit is contained in:
Jeffrey Picard 2022-10-25 08:48:13 +03:00 committed by GitHub
parent b85556499f
commit cd7f20a461
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 546 additions and 307 deletions

View file

@ -8,7 +8,6 @@ import (
"encoding/hex"
"fmt"
"os"
"sync"
"time"
"github.com/lbryio/herald.go/db/prefixes"
@ -16,6 +15,7 @@ import (
"github.com/lbryio/herald.go/internal"
"github.com/lbryio/herald.go/internal/metrics"
pb "github.com/lbryio/herald.go/protobuf/go"
"github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/linxGnu/grocksdb"
log "github.com/sirupsen/logrus"
@ -59,11 +59,7 @@ type ReadOnlyDBColumnFamily struct {
BlockedChannels map[string][]byte
FilteredStreams map[string][]byte
FilteredChannels map[string][]byte
OpenIterators map[string][]chan struct{}
ItMut sync.RWMutex
ShutdownChan chan struct{}
DoneChan chan struct{}
ShutdownCalled bool
Grp *stop.Group
Cleanup func()
}
@ -339,19 +335,10 @@ func interruptRequested(interrupted <-chan struct{}) bool {
func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
ch := make(chan *prefixes.PrefixRowKV)
iterKey := fmt.Sprintf("%p", opts)
if opts.DB != nil {
opts.DB.ItMut.Lock()
// There is a tiny chance that we were wating on the above lock while shutdown was
// being called and by the time we get it the db has already notified all active
// iterators to shutdown. In this case we go to the else branch.
if !opts.DB.ShutdownCalled {
opts.DB.OpenIterators[iterKey] = []chan struct{}{opts.DoneChan, opts.ShutdownChan}
opts.DB.ItMut.Unlock()
} else {
opts.DB.ItMut.Unlock()
return ch
}
// Check if we've been told to shutdown in between getting created and getting here
if opts.Grp != nil && interruptRequested(opts.Grp.Ch()) {
opts.Grp.Done()
return ch
}
ro := grocksdb.NewDefaultReadOptions()
@ -369,11 +356,9 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
it.Close()
close(ch)
ro.Destroy()
if opts.DB != nil {
opts.DoneChan <- struct{}{}
opts.DB.ItMut.Lock()
delete(opts.DB.OpenIterators, iterKey)
opts.DB.ItMut.Unlock()
if opts.Grp != nil {
// opts.Grp.DoneNamed(iterKey)
opts.Grp.Done()
}
}()
@ -394,7 +379,7 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
if kv = opts.ReadRow(&prevKey); kv != nil {
ch <- kv
}
if interruptRequested(opts.ShutdownChan) {
if opts.Grp != nil && interruptRequested(opts.Grp.Ch()) {
return
}
}
@ -546,7 +531,7 @@ func GetWriteDBCF(name string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHandle, er
}
// GetProdDB returns a db that is used for production.
func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func(), error) {
func GetProdDB(name string, secondaryPath string, grp *stop.Group) (*ReadOnlyDBColumnFamily, error) {
prefixNames := prefixes.GetPrefixes()
// additional prefixes that aren't in the code explicitly
cfNames := []string{"default", "e", "d", "c"}
@ -555,7 +540,7 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func
cfNames = append(cfNames, cfName)
}
db, err := GetDBColumnFamilies(name, secondaryPath, cfNames)
db, err := GetDBColumnFamilies(name, secondaryPath, cfNames, grp)
cleanupFiles := func() {
err = os.RemoveAll(secondaryPath)
@ -565,7 +550,8 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func
}
if err != nil {
return nil, cleanupFiles, err
cleanupFiles()
return nil, err
}
cleanupDB := func() {
@ -574,11 +560,11 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func
}
db.Cleanup = cleanupDB
return db, cleanupDB, nil
return db, nil
}
// GetDBColumnFamilies gets a db with the specified column families and secondary path.
func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*ReadOnlyDBColumnFamily, error) {
func GetDBColumnFamilies(name string, secondayPath string, cfNames []string, grp *stop.Group) (*ReadOnlyDBColumnFamily, error) {
opts := grocksdb.NewDefaultOptions()
roOpts := grocksdb.NewDefaultReadOptions()
cfOpt := grocksdb.NewDefaultOptions()
@ -614,11 +600,7 @@ func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*R
LastState: nil,
Height: 0,
Headers: nil,
OpenIterators: make(map[string][]chan struct{}),
ItMut: sync.RWMutex{},
ShutdownChan: make(chan struct{}, 1),
ShutdownCalled: false,
DoneChan: make(chan struct{}, 1),
Grp: grp,
}
err = myDB.ReadDBState() //TODO: Figure out right place for this
@ -687,24 +669,7 @@ func (db *ReadOnlyDBColumnFamily) Unwind() {
// Shutdown shuts down the db.
func (db *ReadOnlyDBColumnFamily) Shutdown() {
log.Println("Sending message to ShutdownChan...")
db.ShutdownChan <- struct{}{}
log.Println("Locking iterator mutex...")
db.ItMut.Lock()
log.Println("Setting ShutdownCalled to true...")
db.ShutdownCalled = true
log.Println("Notifying iterators to shutdown...")
for _, it := range db.OpenIterators {
it[1] <- struct{}{}
}
log.Println("Waiting for iterators to shutdown...")
for _, it := range db.OpenIterators {
<-it[0]
}
log.Println("Unlocking iterator mutex...")
db.ItMut.Unlock()
log.Println("Sending message to DoneChan...")
<-db.DoneChan
db.Grp.StopAndWait()
log.Println("Calling cleanup...")
db.Cleanup()
log.Println("Leaving Shutdown...")
@ -714,6 +679,7 @@ func (db *ReadOnlyDBColumnFamily) Shutdown() {
// to keep the db readonly view up to date and handle reorgs on the
// blockchain.
func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) {
db.Grp.Add(1)
go func() {
lastPrint := time.Now()
for {
@ -727,8 +693,8 @@ func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) {
log.Infof("Error detecting changes: %#v", err)
}
select {
case <-db.ShutdownChan:
db.DoneChan <- struct{}{}
case <-db.Grp.Ch():
db.Grp.Done()
return
case <-time.After(time.Millisecond * 10):
}

View file

@ -7,12 +7,12 @@ import (
"log"
"os"
"strings"
"sync"
"testing"
dbpkg "github.com/lbryio/herald.go/db"
"github.com/lbryio/herald.go/db/prefixes"
"github.com/lbryio/herald.go/internal"
"github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/linxGnu/grocksdb"
)
@ -21,7 +21,7 @@ import (
////////////////////////////////////////////////////////////////////////////////
// OpenAndFillTmpDBColumnFamlies opens a db and fills it with data from a csv file using the given column family names
func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFamily, [][]string, func(), error) {
func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFamily, [][]string, error) {
log.Println(filePath)
file, err := os.Open(filePath)
@ -31,7 +31,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
reader := csv.NewReader(file)
records, err := reader.ReadAll()
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
wOpts := grocksdb.NewDefaultWriteOptions()
@ -39,7 +39,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
opts.SetCreateIfMissing(true)
db, err := grocksdb.OpenDb(opts, "tmp")
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
var handleMap map[string]*grocksdb.ColumnFamilyHandle = make(map[string]*grocksdb.ColumnFamilyHandle)
@ -54,7 +54,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
log.Println(cfName)
handle, err := db.CreateColumnFamily(opts, cfName)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
handleMap[cfName] = handle
}
@ -68,16 +68,16 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
for _, record := range records[1:] {
cf := record[0]
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
handle := handleMap[string(cf)]
key, err := hex.DecodeString(record[1])
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
val, err := hex.DecodeString(record[2])
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
db.PutCF(wOpts, handle, key, val)
}
@ -94,11 +94,8 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
LastState: nil,
Height: 0,
Headers: nil,
OpenIterators: make(map[string][]chan struct{}),
ItMut: sync.RWMutex{},
ShutdownChan: make(chan struct{}, 1),
DoneChan: make(chan struct{}, 1),
ShutdownCalled: false,
Grp: stop.New(),
Cleanup: toDefer,
}
// err = dbpkg.ReadDBState(myDB) //TODO: Figure out right place for this
@ -108,7 +105,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
err = myDB.InitTxCounts()
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
// err = dbpkg.InitHeaders(myDB)
@ -116,7 +113,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
// return nil, nil, nil, err
// }
return myDB, records, toDefer, nil
return myDB, records, nil
}
// OpenAndFillTmpDBCF opens a db and fills it with data from a csv file
@ -247,17 +244,18 @@ func CatCSV(filePath string) {
func TestCatFullDB(t *testing.T) {
t.Skip("Skipping full db test")
grp := stop.New()
// url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c"
// "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1"
// url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
// url := "lbry://@lbry"
// url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a"
dbPath := "/mnt/sda/wallet_server/_data/lbry-rocksdb/"
dbPath := "/mnt/sda1/wallet_server/_data/lbry-rocksdb/"
// dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
secondaryPath := "asdf"
db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath)
db, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp)
defer db.Shutdown()
defer toDefer()
if err != nil {
t.Error(err)
return
@ -277,6 +275,7 @@ func TestCatFullDB(t *testing.T) {
// TestOpenFullDB Tests running a resolve on a full db.
func TestOpenFullDB(t *testing.T) {
t.Skip("Skipping full db test")
grp := stop.New()
// url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c"
// "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1"
// url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
@ -284,11 +283,11 @@ func TestOpenFullDB(t *testing.T) {
// url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a"
// url := "lbry://@lbry$1"
url := "https://lbry.tv/@lothrop:2/lothrop-livestream-games-and-code:c"
dbPath := "/mnt/sda/wallet_server/_data/lbry-rocksdb/"
dbPath := "/mnt/sda1/wallet_server/_data/lbry-rocksdb/"
// dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
secondaryPath := "asdf"
db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath)
defer toDefer()
db, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -302,12 +301,13 @@ func TestOpenFullDB(t *testing.T) {
func TestResolve(t *testing.T) {
url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
filePath := "../testdata/FULL_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
}
defer toDefer()
expandedResolveResult := db.Resolve(url)
log.Printf("%#v\n", expandedResolveResult)
if expandedResolveResult != nil && expandedResolveResult.Channel != nil {
@ -321,11 +321,11 @@ func TestResolve(t *testing.T) {
func TestGetDBState(t *testing.T) {
filePath := "../testdata/s_resolve.csv"
want := uint32(1072108)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
state, err := db.GetDBState()
if err != nil {
t.Error(err)
@ -343,11 +343,11 @@ func TestGetRepostedClaim(t *testing.T) {
// Should be non-existent
channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf")
filePath := "../testdata/W_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
count, err := db.GetRepostedCount(channelHash)
if err != nil {
@ -376,11 +376,11 @@ func TestGetRepostedCount(t *testing.T) {
// Should be non-existent
channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf")
filePath := "../testdata/j_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
count, err := db.GetRepostedCount(channelHash)
if err != nil {
@ -413,11 +413,11 @@ func TestGetRepost(t *testing.T) {
channelHash2, _ := hex.DecodeString("000009ca6e0caaaef16872b4bd4f6f1b8c2363e2")
filePath := "../testdata/V_resolve.csv"
// want := uint32(3670)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
res, err := db.GetRepost(channelHash)
if err != nil {
@ -447,11 +447,12 @@ func TestGetClaimsInChannelCount(t *testing.T) {
channelHash, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bd")
filePath := "../testdata/Z_resolve.csv"
want := uint32(3670)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
count, err := db.GetClaimsInChannelCount(channelHash)
if err != nil {
t.Error(err)
@ -476,11 +477,12 @@ func TestGetShortClaimIdUrl(t *testing.T) {
var position uint16 = 0
filePath := "../testdata/F_resolve.csv"
log.Println(filePath)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
shortUrl, err := db.GetShortClaimIdUrl(name, normalName, claimHash, rootTxNum, position)
if err != nil {
t.Error(err)
@ -494,11 +496,11 @@ func TestClaimShortIdIter(t *testing.T) {
filePath := "../testdata/F_cat.csv"
normalName := "cat"
claimId := "0"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
ch := db.ClaimShortIdIter(normalName, claimId)
@ -524,11 +526,12 @@ func TestGetTXOToClaim(t *testing.T) {
var txNum uint32 = 1456296
var position uint16 = 0
filePath := "../testdata/G_2.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
val, err := db.GetCachedClaimHash(txNum, position)
if err != nil {
t.Error(err)
@ -552,11 +555,11 @@ func TestGetClaimToChannel(t *testing.T) {
var val []byte = nil
filePath := "../testdata/I_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
val, err = db.GetChannelForClaim(claimHash, txNum, position)
if err != nil {
@ -581,11 +584,11 @@ func TestGetEffectiveAmountSupportOnly(t *testing.T) {
want := uint64(20000006)
claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a"
claimHash, _ := hex.DecodeString(claimHashStr)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
db.Height = 999999999
amount, err := db.GetEffectiveAmount(claimHash, true)
@ -611,11 +614,12 @@ func TestGetEffectiveAmount(t *testing.T) {
want := uint64(21000006)
claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a"
claimHash, _ := hex.DecodeString(claimHashStr)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
db.Height = 999999999
amount, err := db.GetEffectiveAmount(claimHash, false)
@ -648,11 +652,12 @@ func TestGetSupportAmount(t *testing.T) {
t.Error(err)
}
filePath := "../testdata/a_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
res, err := db.GetSupportAmount(claimHash)
if err != nil {
t.Error(err)
@ -668,11 +673,12 @@ func TestGetTxHash(t *testing.T) {
want := "54e14ff0c404c29b3d39ae4d249435f167d5cd4ce5a428ecb745b3df1c8e3dde"
filePath := "../testdata/X_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
resHash, err := db.GetTxHash(txNum)
if err != nil {
t.Error(err)
@ -710,11 +716,12 @@ func TestGetActivation(t *testing.T) {
txNum := uint32(0x6284e3)
position := uint16(0x0)
want := uint32(0xa6b65)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
activation, err := db.GetActivation(txNum, position)
if err != nil {
t.Error(err)
@ -741,12 +748,13 @@ func TestGetClaimToTXO(t *testing.T) {
return
}
filePath := "../testdata/E_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
}
defer toDefer()
res, err := db.GetCachedClaimTxo(claimHash, true)
if err != nil {
t.Error(err)
@ -770,12 +778,13 @@ func TestGetControllingClaim(t *testing.T) {
claimName := internal.NormalizeName("@Styxhexenhammer666")
claimHash := "2556ed1cab9d17f2a9392030a9ad7f5d138f11bd"
filePath := "../testdata/P_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
}
defer toDefer()
res, err := db.GetControllingClaim(claimName)
if err != nil {
t.Error(err)

View file

@ -6,6 +6,7 @@ import (
"bytes"
"github.com/lbryio/herald.go/db/prefixes"
"github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/linxGnu/grocksdb"
log "github.com/sirupsen/logrus"
@ -22,12 +23,11 @@ type IterOptions struct {
IncludeValue bool
RawKey bool
RawValue bool
ShutdownChan chan struct{}
DoneChan chan struct{}
DB *ReadOnlyDBColumnFamily
CfHandle *grocksdb.ColumnFamilyHandle
It *grocksdb.Iterator
Serializer *prefixes.SerializationAPI
Grp *stop.Group
// DB *ReadOnlyDBColumnFamily
CfHandle *grocksdb.ColumnFamilyHandle
It *grocksdb.Iterator
Serializer *prefixes.SerializationAPI
}
// NewIterateOptions creates a defualt options structure for a db iterator.
@ -43,12 +43,11 @@ func NewIterateOptions() *IterOptions {
IncludeValue: false,
RawKey: false,
RawValue: false,
ShutdownChan: make(chan struct{}, 1),
DoneChan: make(chan struct{}, 1),
DB: nil,
CfHandle: nil,
It: nil,
Serializer: prefixes.ProductionAPI,
Grp: nil,
// DB: nil,
CfHandle: nil,
It: nil,
Serializer: prefixes.ProductionAPI,
}
}
@ -108,7 +107,9 @@ func (o *IterOptions) WithRawValue(rawValue bool) *IterOptions {
}
func (o *IterOptions) WithDB(db *ReadOnlyDBColumnFamily) *IterOptions {
o.DB = db
// o.Grp.AddNamed(1, iterKey)
o.Grp = stop.New(db.Grp)
o.Grp.Add(1)
return o
}

5
go.mod
View file

@ -24,9 +24,6 @@ require (
gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c
)
require (
)
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
@ -43,7 +40,7 @@ require (
github.com/prometheus/procfs v0.6.0 // indirect
github.com/stretchr/testify v1.7.0 // indirect
golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b // indirect
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84 // indirect

4
go.sum
View file

@ -358,8 +358,6 @@ github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL
github.com/lbryio/lbcd v0.22.100-beta/go.mod h1:u8SaFX4xdGMMR5xasBGfgApC8pvD4rnK2OujZnrq5gs=
github.com/lbryio/lbcd v0.22.100-beta-rc5/go.mod h1:9PbFSlHYX7WlnDQwcTxHVf1W35VAnRsattCSyKOO55g=
github.com/lbryio/lbcd v0.22.200-beta/go.mod h1:kNuzGWf808ipTGB0y0WogzsGv5BVM4Qv85Z+JYwC9FA=
github.com/lbryio/lbcd v0.22.201-beta-rc1 h1:FmzzApVj2RBXloLM2w9tLvN2xyTZjeyh+QC7GIw/wwo=
github.com/lbryio/lbcd v0.22.201-beta-rc1/go.mod h1:kNuzGWf808ipTGB0y0WogzsGv5BVM4Qv85Z+JYwC9FA=
github.com/lbryio/lbcd v0.22.201-beta-rc4 h1:Xh751Bh/GWRcP5bI6NJ2+zueo2otTcTWapFvFbryP5c=
github.com/lbryio/lbcd v0.22.201-beta-rc4/go.mod h1:Jgo48JDINhdOgHHR83J70Q6G42x3WAo9DI//QogcL+E=
github.com/lbryio/lbcutil v1.0.201/go.mod h1:gDHc/b+Rdz3J7+VB8e5/Bl9roVf8Q5/8FQCyuK9dXD0=
@ -375,8 +373,6 @@ github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-b
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/linxGnu/grocksdb v1.6.42 h1:nJLoXFuzwBwQQQrXTUgRGRz1QRm7y8pR6CNV/gwrbqs=
github.com/linxGnu/grocksdb v1.6.42/go.mod h1:JcMMDBFaDNhRXFYcYXmgQwb/RarSld1PulTI7UzE+w0=
github.com/linxGnu/grocksdb v1.7.0 h1:UyFDykX0CUfxDN10cqlFho/rwt9K6KoDaLXL9Ej5z9g=
github.com/linxGnu/grocksdb v1.7.0/go.mod h1:JcMMDBFaDNhRXFYcYXmgQwb/RarSld1PulTI7UzE+w0=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
github.com/lyoshenka/bencode v0.0.0-20180323155644-b7abd7672df5/go.mod h1:H0aPCWffGOaDcjkw1iB7W9DVLp6GXmfcJY/7YZCWPA4=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=

12
main.go
View file

@ -10,6 +10,7 @@ import (
"github.com/lbryio/herald.go/internal"
pb "github.com/lbryio/herald.go/protobuf/go"
"github.com/lbryio/herald.go/server"
"github.com/lbryio/lbry.go/v3/extras/stop"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
@ -27,13 +28,16 @@ func main() {
if args.CmdType == server.ServeCmd {
// This will cancel goroutines with the server finishes.
ctxWCancel, cancel := context.WithCancel(ctx)
defer cancel()
// ctxWCancel, cancel := context.WithCancel(ctx)
// defer cancel()
stopGroup := stop.New()
// defer stopGroup.Stop()
initsignals()
initsignals(stopGroup.Ch())
interrupt := interruptListener()
s := server.MakeHubServer(ctxWCancel, args)
// s := server.MakeHubServer(ctxWCancel, args)
s := server.MakeHubServer(stopGroup, args)
go s.Run()
defer func() {

View file

@ -88,7 +88,7 @@ test_command_with_want
### blockchain.block.get_chunk
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.block.get_chunk", "params": [0]}'
| jq .result | sed 's/"//g' | head -c 100
EOM
@ -97,7 +97,7 @@ test_command_with_want
### blockchain.block.get_header
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.block.get_header", "params": []}'
| jq .result.timestamp
EOM
@ -106,7 +106,7 @@ test_command_with_want
### blockchain.block.headers
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.block.headers", "params": []}'
| jq .result.count
EOM
@ -116,7 +116,7 @@ test_command_with_want
## blockchain.claimtrie
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.claimtrie.resolve", "params":[{"Data": ["@Styxhexenhammer666:2"]}]}'
| jq .result.txos[0].tx_hash | sed 's/"//g'
EOM
@ -128,7 +128,7 @@ test_command_with_want
### blockchain.address.get_balance
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.address.get_balance", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
| jq .result.confirmed
EOM
@ -138,7 +138,7 @@ test_command_with_want
## blockchain.address.get_history
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.address.get_history", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
| jq '.result.confirmed | length'
EOM
@ -148,7 +148,7 @@ test_command_with_want
## blockchain.address.listunspent
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.address.listunspent", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
| jq '.result | length'
EOM
@ -160,7 +160,7 @@ test_command_with_want
## blockchain.scripthash.get_mempool
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.scripthash.get_mempool", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
| jq .error | sed 's/"//g'
EOM
@ -170,7 +170,7 @@ test_command_with_want
## blockchain.scripthash.get_history
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.scripthash.get_history", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
| jq .error | sed 's/"//g'
EOM
@ -180,13 +180,42 @@ test_command_with_want
## blockchain.scripthash.listunspent
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json"
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "blockchain.scripthash.listunspent", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}'
| jq .error | sed 's/"//g'
EOM
WANT="encoding/hex: invalid byte: U+0047 'G'"
test_command_with_want
## server.banner
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "server.banner", "params":[]}'
| jq .result | sed 's/"//g'
EOM
WANT="You are connected to an 0.107.0 server."
test_command_with_want
## server.version
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "server.version", "params":[]}'
| jq .result | sed 's/"//g'
EOM
WANT="0.107.0"
test_command_with_want
## server.features
read -r -d '' CMD <<- EOM
curl http://127.0.0.1:50002/rpc -s -H "Content-Type: application/json"
--data '{"id": 1, "method": "server.features", "params":[]}'
EOM
WANT='{"result":{"hosts":{},"pruning":"","server_version":"0.107.0","protocol_min":"0.54.0","protocol_max":"0.199.0","genesis_hash":"9c89283ba0f3227f6c03b70216b9f665f0118d5e0fa729cedf4fb34d6a34f463","description":"Herald","payment_address":"","donation_address":"","daily_fee":"1.0","hash_function":"sha256","trending_algorithm":"fast_ar"},"error":null,"id":1}'
test_command_with_want
# metrics endpoint testing
WANT=0

View file

@ -1,6 +1,7 @@
package server
import (
"fmt"
"log"
"os"
"strconv"
@ -19,26 +20,37 @@ const (
// Args struct contains the arguments to the hub server.
type Args struct {
CmdType int
Host string
Port string
DBPath string
Chain *string
EsHost string
EsPort string
PrometheusPort string
NotifierPort string
JSONRPCPort int
JSONRPCHTTPPort int
MaxSessions int
SessionTimeout int
EsIndex string
RefreshDelta int
CacheTTL int
PeerFile string
Country string
BlockingChannelIds []string
FilteringChannelIds []string
CmdType int
Host string
Port string
DBPath string
Chain *string
EsHost string
EsPort string
PrometheusPort string
NotifierPort string
JSONRPCPort int
JSONRPCHTTPPort int
MaxSessions int
SessionTimeout int
EsIndex string
RefreshDelta int
CacheTTL int
PeerFile string
Banner *string
Country string
BlockingChannelIds []string
FilteringChannelIds []string
GenesisHash string
ServerVersion string
ProtocolMin string
ProtocolMax string
ServerDescription string
PaymentAddress string
DonationAddress string
DailyFee string
Debug bool
DisableEs bool
DisableLoadPeers bool
@ -54,21 +66,32 @@ type Args struct {
}
const (
DefaultHost = "0.0.0.0"
DefaultPort = "50051"
DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME
DefaultEsHost = "http://localhost"
DefaultEsIndex = "claims"
DefaultEsPort = "9200"
DefaultPrometheusPort = "2112"
DefaultNotifierPort = "18080"
DefaultJSONRPCPort = 50001
DefaultMaxSessions = 10000
DefaultSessionTimeout = 300
DefaultRefreshDelta = 5
DefaultCacheTTL = 5
DefaultPeerFile = "peers.txt"
DefaultCountry = "US"
DefaultHost = "0.0.0.0"
DefaultPort = "50051"
DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME
DefaultEsHost = "http://localhost"
DefaultEsIndex = "claims"
DefaultEsPort = "9200"
DefaultPrometheusPort = "2112"
DefaultNotifierPort = "18080"
DefaultJSONRPCPort = 50001
DefaultJSONRPCHTTPPort = 50002
DefaultMaxSessions = 10000
DefaultSessionTimeout = 300
DefaultRefreshDelta = 5
DefaultCacheTTL = 5
DefaultPeerFile = "peers.txt"
DefaultBannerFile = ""
DefaultCountry = "US"
HUB_PROTOCOL_VERSION = "0.107.0"
PROTOCOL_MIN = "0.54.0"
PROTOCOL_MAX = "0.199.0"
DefaultServerDescription = "Herald"
DefaultPaymentAddress = ""
DefaultDonationAddress = ""
DefaultDailyFee = "1.0"
DefaultDisableLoadPeers = false
DefaultDisableStartPrometheus = false
DefaultDisableStartUDP = false
@ -86,6 +109,73 @@ var (
DefaultFilteringChannelIds = []string{}
)
func loadBanner(bannerFile *string, serverVersion string) *string {
var banner string
data, err := os.ReadFile(*bannerFile)
if err != nil {
banner = fmt.Sprintf("You are connected to an %s server.", serverVersion)
} else {
banner = string(data)
}
/*
banner := os.Getenv("BANNER")
if banner == "" {
return nil
}
*/
return &banner
}
// MakeDefaultArgs creates a default set of arguments for testing the server.
func MakeDefaultTestArgs() *Args {
args := &Args{
CmdType: ServeCmd,
Host: DefaultHost,
Port: DefaultPort,
DBPath: DefaultDBPath,
EsHost: DefaultEsHost,
EsPort: DefaultEsPort,
PrometheusPort: DefaultPrometheusPort,
NotifierPort: DefaultNotifierPort,
JSONRPCPort: DefaultJSONRPCPort,
JSONRPCHTTPPort: DefaultJSONRPCHTTPPort,
MaxSessions: DefaultMaxSessions,
SessionTimeout: DefaultSessionTimeout,
EsIndex: DefaultEsIndex,
RefreshDelta: DefaultRefreshDelta,
CacheTTL: DefaultCacheTTL,
PeerFile: DefaultPeerFile,
Banner: nil,
Country: DefaultCountry,
GenesisHash: chaincfg.TestNet3Params.GenesisHash.String(),
ServerVersion: HUB_PROTOCOL_VERSION,
ProtocolMin: PROTOCOL_MIN,
ProtocolMax: PROTOCOL_MAX,
ServerDescription: DefaultServerDescription,
PaymentAddress: DefaultPaymentAddress,
DonationAddress: DefaultDonationAddress,
DailyFee: DefaultDailyFee,
DisableEs: true,
Debug: true,
DisableLoadPeers: true,
DisableStartPrometheus: true,
DisableStartUDP: true,
DisableWritePeers: true,
DisableRocksDBRefresh: true,
DisableResolve: true,
DisableBlockingAndFiltering: true,
DisableStartNotifier: true,
DisableStartJSONRPC: true,
}
return args
}
// GetEnvironment takes the environment variables as an array of strings
// and a getkeyval function to turn it into a map.
func GetEnvironment(data []string, getkeyval func(item string) (key, val string)) map[string]string {
@ -111,7 +201,7 @@ func GetEnvironmentStandard() map[string]string {
func ParseArgs(searchRequest *pb.SearchRequest) *Args {
environment := GetEnvironmentStandard()
parser := argparse.NewParser("hub", "hub server and client")
parser := argparse.NewParser("herald", "herald server and client")
serveCmd := parser.NewCommand("serve", "start the hub server")
searchCmd := parser.NewCommand("search", "claim search")
@ -122,6 +212,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
return err
}
// main server config arguments
host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost})
port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort})
dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath})
@ -131,18 +222,26 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort})
prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort})
notifierPort := parser.String("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort})
jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort})
jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort})
jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort, Default: DefaultJSONRPCPort})
jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort, Default: DefaultJSONRPCHTTPPort})
maxSessions := parser.Int("", "max-sessions", &argparse.Options{Required: false, Help: "Maximum number of electrum clients that can be connected", Default: DefaultMaxSessions})
sessionTimeout := parser.Int("", "session-timeout", &argparse.Options{Required: false, Help: "Session inactivity timeout (seconds)", Default: DefaultSessionTimeout})
esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: DefaultEsIndex})
refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: DefaultRefreshDelta})
cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: DefaultCacheTTL})
peerFile := parser.String("", "peerfile", &argparse.Options{Required: false, Help: "Initial peer file for federation", Default: DefaultPeerFile})
bannerFile := parser.String("", "bannerfile", &argparse.Options{Required: false, Help: "Banner file server.banner", Default: DefaultBannerFile})
country := parser.String("", "country", &argparse.Options{Required: false, Help: "Country this node is running in. Default US.", Default: DefaultCountry})
blockingChannelIds := parser.StringList("", "blocking-channel-ids", &argparse.Options{Required: false, Help: "Blocking channel ids", Default: DefaultBlockingChannelIds})
filteringChannelIds := parser.StringList("", "filtering-channel-ids", &argparse.Options{Required: false, Help: "Filtering channel ids", Default: DefaultFilteringChannelIds})
// arguments for server features
serverDescription := parser.String("", "server-description", &argparse.Options{Required: false, Help: "Server description", Default: DefaultServerDescription})
paymentAddress := parser.String("", "payment-address", &argparse.Options{Required: false, Help: "Payment address", Default: DefaultPaymentAddress})
donationAddress := parser.String("", "donation-address", &argparse.Options{Required: false, Help: "Donation address", Default: DefaultDonationAddress})
dailyFee := parser.String("", "daily-fee", &argparse.Options{Required: false, Help: "Daily fee", Default: DefaultDailyFee})
// flags for disabling features
debug := parser.Flag("", "debug", &argparse.Options{Required: false, Help: "enable debug logging", Default: false})
disableEs := parser.Flag("", "disable-es", &argparse.Options{Required: false, Help: "Disable elastic search, for running/testing independently", Default: false})
disableLoadPeers := parser.Flag("", "disable-load-peers", &argparse.Options{Required: false, Help: "Disable load peers from disk at startup", Default: DefaultDisableLoadPeers})
@ -156,6 +255,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
disableStartNotifier := parser.Flag("", "disable-start-notifier", &argparse.Options{Required: false, Help: "Disable start notifier", Default: DisableStartNotifier})
disableStartJSONRPC := parser.Flag("", "disable-start-jsonrpc", &argparse.Options{Required: false, Help: "Disable start jsonrpc endpoint", Default: DisableStartJSONRPC})
// search command arguments
text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"})
name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"})
claimType := parser.String("", "claim_type", &argparse.Options{Required: false, Help: "claim_type"})
@ -177,27 +277,40 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
*jsonRPCPort = DefaultJSONRPCPort
}
banner := loadBanner(bannerFile, HUB_PROTOCOL_VERSION)
args := &Args{
CmdType: SearchCmd,
Host: *host,
Port: *port,
DBPath: *dbPath,
Chain: chain,
EsHost: *esHost,
EsPort: *esPort,
PrometheusPort: *prometheusPort,
NotifierPort: *notifierPort,
JSONRPCPort: *jsonRPCPort,
JSONRPCHTTPPort: *jsonRPCHTTPPort,
MaxSessions: *maxSessions,
SessionTimeout: *sessionTimeout,
EsIndex: *esIndex,
RefreshDelta: *refreshDelta,
CacheTTL: *cacheTTL,
PeerFile: *peerFile,
Country: *country,
BlockingChannelIds: *blockingChannelIds,
FilteringChannelIds: *filteringChannelIds,
CmdType: SearchCmd,
Host: *host,
Port: *port,
DBPath: *dbPath,
Chain: chain,
EsHost: *esHost,
EsPort: *esPort,
PrometheusPort: *prometheusPort,
NotifierPort: *notifierPort,
JSONRPCPort: *jsonRPCPort,
JSONRPCHTTPPort: *jsonRPCHTTPPort,
MaxSessions: *maxSessions,
SessionTimeout: *sessionTimeout,
EsIndex: *esIndex,
RefreshDelta: *refreshDelta,
CacheTTL: *cacheTTL,
PeerFile: *peerFile,
Banner: banner,
Country: *country,
BlockingChannelIds: *blockingChannelIds,
FilteringChannelIds: *filteringChannelIds,
GenesisHash: "",
ServerVersion: HUB_PROTOCOL_VERSION,
ProtocolMin: PROTOCOL_MIN,
ProtocolMax: PROTOCOL_MAX,
ServerDescription: *serverDescription,
PaymentAddress: *paymentAddress,
DonationAddress: *donationAddress,
DailyFee: *dailyFee,
Debug: *debug,
DisableEs: *disableEs,
DisableLoadPeers: *disableLoadPeers,

View file

@ -12,7 +12,8 @@ import (
"github.com/lbryio/herald.go/internal/metrics"
pb "github.com/lbryio/herald.go/protobuf/go"
server "github.com/lbryio/herald.go/server"
"github.com/lbryio/herald.go/server"
"github.com/lbryio/lbry.go/v3/extras/stop"
dto "github.com/prometheus/client_model/go"
"google.golang.org/grpc"
)
@ -44,43 +45,11 @@ func removeFile(fileName string) {
}
}
// makeDefaultArgs creates a default set of arguments for testing the server.
func makeDefaultArgs() *server.Args {
args := &server.Args{
CmdType: server.ServeCmd,
Host: server.DefaultHost,
Port: server.DefaultPort,
DBPath: server.DefaultDBPath,
EsHost: server.DefaultEsHost,
EsPort: server.DefaultEsPort,
PrometheusPort: server.DefaultPrometheusPort,
NotifierPort: server.DefaultNotifierPort,
JSONRPCPort: server.DefaultJSONRPCPort,
EsIndex: server.DefaultEsIndex,
RefreshDelta: server.DefaultRefreshDelta,
CacheTTL: server.DefaultCacheTTL,
PeerFile: server.DefaultPeerFile,
Country: server.DefaultCountry,
DisableEs: true,
Debug: true,
DisableLoadPeers: true,
DisableStartPrometheus: true,
DisableStartUDP: true,
DisableWritePeers: true,
DisableRocksDBRefresh: true,
DisableResolve: true,
DisableBlockingAndFiltering: true,
DisableStartNotifier: true,
DisableStartJSONRPC: true,
}
return args
}
// TestAddPeer tests the ability to add peers
func TestAddPeer(t *testing.T) {
ctx := context.Background()
args := makeDefaultArgs()
// ctx := context.Background()
ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs()
tests := []struct {
name string
@ -137,8 +106,9 @@ func TestAddPeer(t *testing.T) {
// TestPeerWriter tests that peers get written properly
func TestPeerWriter(t *testing.T) {
ctx := context.Background()
args := makeDefaultArgs()
// ctx := context.Background()
ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs()
args.DisableWritePeers = false
tests := []struct {
@ -193,9 +163,10 @@ func TestPeerWriter(t *testing.T) {
// TestAddPeerEndpoint tests the ability to add peers
func TestAddPeerEndpoint(t *testing.T) {
ctx := context.Background()
args := makeDefaultArgs()
args2 := makeDefaultArgs()
// ctx := context.Background()
ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs()
args2 := server.MakeDefaultTestArgs()
args2.Port = "50052"
tests := []struct {
@ -264,10 +235,11 @@ func TestAddPeerEndpoint(t *testing.T) {
// TestAddPeerEndpoint2 tests the ability to add peers
func TestAddPeerEndpoint2(t *testing.T) {
ctx := context.Background()
args := makeDefaultArgs()
args2 := makeDefaultArgs()
args3 := makeDefaultArgs()
// ctx := context.Background()
ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs()
args2 := server.MakeDefaultTestArgs()
args3 := server.MakeDefaultTestArgs()
args2.Port = "50052"
args3.Port = "50053"
@ -345,10 +317,11 @@ func TestAddPeerEndpoint2(t *testing.T) {
// TestAddPeerEndpoint3 tests the ability to add peers
func TestAddPeerEndpoint3(t *testing.T) {
ctx := context.Background()
args := makeDefaultArgs()
args2 := makeDefaultArgs()
args3 := makeDefaultArgs()
// ctx := context.Background()
ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs()
args2 := server.MakeDefaultTestArgs()
args3 := server.MakeDefaultTestArgs()
args2.Port = "50052"
args3.Port = "50053"
@ -434,10 +407,11 @@ func TestAddPeerEndpoint3(t *testing.T) {
// TestAddPeer tests the ability to add peers
func TestUDPServer(t *testing.T) {
ctx := context.Background()
args := makeDefaultArgs()
// ctx := context.Background()
ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs()
args.DisableStartUDP = false
args2 := makeDefaultArgs()
args2 := server.MakeDefaultTestArgs()
args2.Port = "50052"
args2.DisableStartUDP = false

View file

@ -105,6 +105,7 @@ func newBlockHeaderElectrum(header *[HEADER_SIZE]byte, height uint32) *BlockHead
type BlockGetServerHeightReq struct{}
type BlockGetServerHeightResp uint32
// blockchain.block.get_server_height
func (s *BlockchainBlockService) Get_server_height(req *BlockGetServerHeightReq, resp **BlockGetServerHeightResp) error {
if s.DB == nil || s.DB.LastState == nil {
return fmt.Errorf("unknown height")

View file

@ -13,6 +13,7 @@ import (
"github.com/lbryio/lbcd/chaincfg"
"github.com/lbryio/lbcd/txscript"
"github.com/lbryio/lbcutil"
"github.com/lbryio/lbry.go/v3/extras/stop"
)
// Source: test_variety_of_transactions_and_longish_history (lbry-sdk/tests/integration/transactions)
@ -57,8 +58,9 @@ var regTestAddrs = [30]string{
func TestServerGetHeight(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
grp := stop.NewDebug()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -87,8 +89,9 @@ func TestServerGetHeight(t *testing.T) {
func TestGetChunk(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
grp := stop.NewDebug()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -130,8 +133,9 @@ func TestGetChunk(t *testing.T) {
func TestGetHeader(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
grp := stop.NewDebug()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -159,8 +163,9 @@ func TestGetHeader(t *testing.T) {
func TestHeaders(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
grp := stop.NewDebug()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -189,15 +194,17 @@ func TestHeaders(t *testing.T) {
}
func TestHeadersSubscribe(t *testing.T) {
args := MakeDefaultTestArgs()
grp := stop.NewDebug()
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
}
sm := newSessionManager(db, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout)
sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams)
sm.start()
defer sm.stop()
@ -209,13 +216,13 @@ func TestHeadersSubscribe(t *testing.T) {
// Set up logic to read a notification.
var received sync.WaitGroup
recv := func(client net.Conn) {
defer received.Done()
buf := make([]byte, 1024)
len, err := client.Read(buf)
if err != nil {
t.Errorf("read err: %v", err)
}
t.Logf("len: %v notification: %v", len, string(buf))
received.Done()
}
received.Add(2)
go recv(client1)
@ -281,8 +288,9 @@ func TestHeadersSubscribe(t *testing.T) {
func TestGetBalance(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
grp := stop.NewDebug()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -310,8 +318,9 @@ func TestGetBalance(t *testing.T) {
func TestGetHistory(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
grp := stop.NewDebug()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -339,8 +348,9 @@ func TestGetHistory(t *testing.T) {
func TestListUnspent(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
grp := stop.NewDebug()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -367,15 +377,17 @@ func TestListUnspent(t *testing.T) {
}
func TestAddressSubscribe(t *testing.T) {
args := MakeDefaultTestArgs()
grp := stop.NewDebug()
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
}
sm := newSessionManager(db, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout)
sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams)
sm.start()
defer sm.stop()

89
server/jsonrpc_server.go Normal file
View file

@ -0,0 +1,89 @@
package server
import (
log "github.com/sirupsen/logrus"
)
type ServerService struct {
Args *Args
}
type ServerFeatureService struct {
Args *Args
}
type ServerFeaturesReq struct{}
type ServerFeaturesRes struct {
Hosts map[string]string `json:"hosts"`
Pruning string `json:"pruning"`
ServerVersion string `json:"server_version"`
ProtocolMin string `json:"protocol_min"`
ProtocolMax string `json:"protocol_max"`
GenesisHash string `json:"genesis_hash"`
Description string `json:"description"`
PaymentAddress string `json:"payment_address"`
DonationAddress string `json:"donation_address"`
DailyFee string `json:"daily_fee"`
HashFunction string `json:"hash_function"`
TrendingAlgorithm string `json:"trending_algorithm"`
}
// Features is the json rpc endpoint for 'server.features'.
func (t *ServerService) Features(req *ServerFeaturesReq, res **ServerFeaturesRes) error {
log.Println("Features")
features := &ServerFeaturesRes{
Hosts: map[string]string{},
Pruning: "",
ServerVersion: HUB_PROTOCOL_VERSION,
ProtocolMin: PROTOCOL_MIN,
ProtocolMax: PROTOCOL_MAX,
GenesisHash: t.Args.GenesisHash,
Description: t.Args.ServerDescription,
PaymentAddress: t.Args.PaymentAddress,
DonationAddress: t.Args.DonationAddress,
DailyFee: t.Args.DailyFee,
HashFunction: "sha256",
TrendingAlgorithm: "fast_ar",
}
*res = features
return nil
}
type ServerBannerService struct {
Args *Args
}
type ServerBannerReq struct{}
type ServerBannerRes string
// Banner is the json rpc endpoint for 'server.banner'.
func (t *ServerService) Banner(req *ServerBannerReq, res **ServerBannerRes) error {
log.Println("Banner")
*res = (*ServerBannerRes)(t.Args.Banner)
return nil
}
type ServerVersionService struct {
Args *Args
}
type ServerVersionReq struct{}
type ServerVersionRes string
// Banner is the json rpc endpoint for 'server.version'.
// FIXME: This should return a struct with the version and the protocol version.
// <<-- that comment was written by github, scary shit because it's true
func (t *ServerService) Version(req *ServerVersionReq, res **ServerVersionRes) error {
log.Println("Version")
*res = (*ServerVersionRes)(&t.Args.ServerVersion)
return nil
}

View file

@ -121,6 +121,14 @@ fail1:
goto fail2
}
// Register "server.{features,banner,version}" handlers.
serverSvc := &ServerService{s.Args}
err = s1.RegisterTCPService(serverSvc, "server")
if err != nil {
log.Errorf("RegisterTCPService: %v\n", err)
goto fail2
}
r := gorilla_mux.NewRouter()
r.Handle("/rpc", s1)
port := ":" + strconv.FormatUint(uint64(s.Args.JSONRPCHTTPPort), 10)

View file

@ -1,7 +1,6 @@
package server_test
import (
"context"
"encoding/hex"
"fmt"
"net"
@ -10,6 +9,7 @@ import (
"github.com/lbryio/herald.go/internal"
"github.com/lbryio/herald.go/server"
"github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/sirupsen/logrus"
)
@ -47,8 +47,9 @@ func tcpRead(conn net.Conn) ([]byte, error) {
}
func TestNotifierServer(t *testing.T) {
args := makeDefaultArgs()
ctx := context.Background()
args := server.MakeDefaultTestArgs()
// ctx := context.Background()
ctx := stop.NewDebug()
hub := server.MakeHubServer(ctx, args)
go hub.NotifierServer()

View file

@ -11,6 +11,7 @@ import (
pb "github.com/lbryio/herald.go/protobuf/go"
server "github.com/lbryio/herald.go/server"
"github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/olivere/elastic/v7"
)
@ -55,13 +56,14 @@ func TestSearch(t *testing.T) {
w.Write([]byte(resp))
}
context := context.Background()
args := makeDefaultArgs()
hubServer := server.MakeHubServer(context, args)
ctx := context.Background()
stopGroup := stop.NewDebug()
args := server.MakeDefaultTestArgs()
hubServer := server.MakeHubServer(stopGroup, args)
req := &pb.SearchRequest{
Text: "asdf",
}
out, err := hubServer.Search(context, req)
out, err := hubServer.Search(ctx, req)
if err != nil {
log.Println(err)
}

View file

@ -22,6 +22,7 @@ import (
"github.com/lbryio/herald.go/meta"
pb "github.com/lbryio/herald.go/protobuf/go"
"github.com/lbryio/lbcd/chaincfg"
"github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/olivere/elastic/v7"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
@ -53,6 +54,7 @@ type Server struct {
HeightSubs map[net.Addr]net.Conn
HeightSubsMut sync.RWMutex
NotifierChan chan interface{}
Grp *stop.Group
sessionManager *sessionManager
pb.UnimplementedHubServer
}
@ -149,7 +151,7 @@ func (s *Server) Run() {
}
}
func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) {
func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, error) {
tmpName, err := ioutil.TempDir("", "go-lbry-hub")
if err != nil {
logrus.Info(err)
@ -159,10 +161,7 @@ func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) {
if err != nil {
logrus.Info(err)
}
myDB, _, err := db.GetProdDB(args.DBPath, tmpName)
// dbShutdown = func() {
// db.Shutdown(myDB)
// }
myDB, err := db.GetProdDB(args.DBPath, tmpName, grp)
if err != nil {
// Can't load the db, fail loudly
logrus.Info(err)
@ -217,7 +216,7 @@ func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) {
// MakeHubServer takes the arguments given to a hub when it's started and
// initializes everything. It loads information about previously known peers,
// creates needed internal data structures, and initializes goroutines.
func MakeHubServer(ctx context.Context, args *Args) *Server {
func MakeHubServer(grp *stop.Group, args *Args) *Server {
grpcServer := grpc.NewServer(grpc.NumStreamWorkers(0))
multiSpaceRe, err := regexp.Compile(`\s{2,}`)
@ -266,14 +265,14 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
//TODO: is this the right place to load the db?
var myDB *db.ReadOnlyDBColumnFamily
// var dbShutdown = func() {}
if !args.DisableResolve {
myDB, err = LoadDatabase(args)
myDB, err = LoadDatabase(args, grp)
if err != nil {
logrus.Warning(err)
}
}
// Determine which chain to use based on db and cli values
dbChain := (*chaincfg.Params)(nil)
if myDB != nil && myDB.LastState != nil && myDB.LastState.Genesis != nil {
// The chain params can be inferred from DBStateValue.
@ -310,6 +309,10 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
}
logrus.Infof("network: %v", chain.Name)
args.GenesisHash = chain.GenesisHash.String()
sessionGrp := stop.New(grp)
s := &Server{
GrpcServer: grpcServer,
Args: args,
@ -333,7 +336,8 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
HeightSubs: make(map[net.Addr]net.Conn),
HeightSubsMut: sync.RWMutex{},
NotifierChan: make(chan interface{}),
sessionManager: newSessionManager(myDB, &chain, args.MaxSessions, args.SessionTimeout),
Grp: grp,
sessionManager: newSessionManager(myDB, args, sessionGrp, &chain),
}
// Start up our background services

View file

@ -1,5 +1,11 @@
package server
import (
"github.com/lbryio/herald.go/db"
"github.com/lbryio/lbcd/chaincfg"
"github.com/lbryio/lbry.go/v3/extras/stop"
)
func (s *Server) AddPeerExported() func(*Peer, bool, bool) error {
return s.addPeer
}
@ -7,3 +13,7 @@ func (s *Server) AddPeerExported() func(*Peer, bool, bool) error {
func (s *Server) GetNumPeersExported() func() int64 {
return s.getNumPeers
}
func NewSessionManagerExported(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
return newSessionManager(db, args, grp, chain)
}

View file

@ -14,6 +14,7 @@ import (
"github.com/lbryio/herald.go/db"
"github.com/lbryio/herald.go/internal"
"github.com/lbryio/lbcd/chaincfg"
"github.com/lbryio/lbry.go/v3/extras/stop"
log "github.com/sirupsen/logrus"
)
@ -114,13 +115,15 @@ type sessionMap map[uintptr]*session
type sessionManager struct {
// sessionsMut protects sessions, headerSubs, hashXSubs state
sessionsMut sync.RWMutex
sessions sessionMap
sessionsWait sync.WaitGroup
sessionsMut sync.RWMutex
sessions sessionMap
// sessionsWait sync.WaitGroup
grp *stop.Group
sessionsMax int
sessionTimeout time.Duration
manageTicker *time.Ticker
db *db.ReadOnlyDBColumnFamily
args *Args
chain *chaincfg.Params
// headerSubs are sessions subscribed via 'blockchain.headers.subscribe'
headerSubs sessionMap
@ -128,13 +131,15 @@ type sessionManager struct {
hashXSubs map[[HASHX_LEN]byte]sessionMap
}
func newSessionManager(db *db.ReadOnlyDBColumnFamily, chain *chaincfg.Params, sessionsMax, sessionTimeout int) *sessionManager {
func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
return &sessionManager{
sessions: make(sessionMap),
sessionsMax: sessionsMax,
sessionTimeout: time.Duration(sessionTimeout) * time.Second,
manageTicker: time.NewTicker(time.Duration(max(5, sessionTimeout/20)) * time.Second),
grp: grp,
sessionsMax: args.MaxSessions,
sessionTimeout: time.Duration(args.SessionTimeout) * time.Second,
manageTicker: time.NewTicker(time.Duration(max(5, args.SessionTimeout/20)) * time.Second),
db: db,
args: args,
chain: chain,
headerSubs: make(sessionMap),
hashXSubs: make(map[[HASHX_LEN]byte]sessionMap),
@ -142,6 +147,7 @@ func newSessionManager(db *db.ReadOnlyDBColumnFamily, chain *chaincfg.Params, se
}
func (sm *sessionManager) start() {
sm.grp.Add(1)
go sm.manage()
}
@ -168,7 +174,13 @@ func (sm *sessionManager) manage() {
}
sm.sessionsMut.Unlock()
// Wait for next management clock tick.
<-sm.manageTicker.C
select {
case <-sm.grp.Ch():
sm.grp.Done()
return
case <-sm.manageTicker.C:
continue
}
}
}
@ -190,11 +202,18 @@ func (sm *sessionManager) addSession(conn net.Conn) *session {
// each request and update subscriptions.
s1 := rpc.NewServer()
// Register "server.{features,banner,version}" handlers.
serverSvc := &ServerService{sm.args}
err := s1.RegisterName("server", serverSvc)
if err != nil {
log.Errorf("RegisterName: %v\n", err)
}
// Register "blockchain.claimtrie.*"" handlers.
claimtrieSvc := &ClaimtrieService{sm.db}
err := s1.RegisterName("blockchain.claimtrie", claimtrieSvc)
err = s1.RegisterName("blockchain.claimtrie", claimtrieSvc)
if err != nil {
log.Errorf("RegisterService: %v\n", err)
log.Errorf("RegisterName: %v\n", err)
}
// Register other "blockchain.{block,address,scripthash}.*" handlers.
@ -220,11 +239,11 @@ func (sm *sessionManager) addSession(conn net.Conn) *session {
goto fail
}
sm.sessionsWait.Add(1)
sm.grp.Add(1)
go func() {
s1.ServeCodec(&SessionServerCodec{jsonrpc.NewServerCodec(conn), sess})
log.Infof("session %v goroutine exit", sess.addr.String())
sm.sessionsWait.Done()
sm.grp.Done()
}()
return sess

View file

@ -11,7 +11,7 @@ import (
// TestUDPPing tests UDPPing correctness against prod server.
func TestUDPPing(t *testing.T) {
args := makeDefaultArgs()
args := server.MakeDefaultTestArgs()
args.DisableStartUDP = true
tests := []struct {

View file

@ -8,12 +8,13 @@ import (
"os"
"os/signal"
"github.com/lbryio/lbry.go/v3/extras/stop"
log "github.com/sirupsen/logrus"
)
// shutdownRequestChannel is used to initiate shutdown from one of the
// subsystems using the same code paths as when an interrupt signal is received.
var shutdownRequestChannel = make(chan struct{})
var shutdownRequestChannel = make(stop.Chan)
// interruptSignals defines the default signals to catch in order to do a proper
// shutdown. This may be modified during init depending on the platform.

View file

@ -10,9 +10,12 @@ package main
import (
"os"
"syscall"
"github.com/lbryio/lbry.go/v3/extras/stop"
)
// initsignals sets the signals to be caught by the signal handler
func initsignals() {
func initsignals(stopCh stop.Chan) {
shutdownRequestChannel = stopCh
interruptSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
}