currently broken, incorporated stop groups into the session manager

This commit is contained in:
Jeffrey Picard 2022-10-17 11:45:45 +00:00
parent 31e91f67c8
commit a198113f73
5 changed files with 123 additions and 57 deletions

View file

@ -531,7 +531,8 @@ func GetWriteDBCF(name string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHandle, er
}
// GetProdDB returns a db that is used for production.
func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func(), error) {
func GetProdDB(name string, secondaryPath string, grp *stop.Group) (*ReadOnlyDBColumnFamily, func(), error) {
grp.Add(1)
prefixNames := prefixes.GetPrefixes()
// additional prefixes that aren't in the code explicitly
cfNames := []string{"default", "e", "d", "c"}
@ -540,7 +541,7 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func
cfNames = append(cfNames, cfName)
}
db, err := GetDBColumnFamilies(name, secondaryPath, cfNames)
db, err := GetDBColumnFamilies(name, secondaryPath, cfNames, grp)
cleanupFiles := func() {
err = os.RemoveAll(secondaryPath)
@ -563,7 +564,7 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func
}
// GetDBColumnFamilies gets a db with the specified column families and secondary path.
func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*ReadOnlyDBColumnFamily, error) {
func GetDBColumnFamilies(name string, secondayPath string, cfNames []string, grp *stop.Group) (*ReadOnlyDBColumnFamily, error) {
opts := grocksdb.NewDefaultOptions()
roOpts := grocksdb.NewDefaultReadOptions()
cfOpt := grocksdb.NewDefaultOptions()
@ -599,7 +600,7 @@ func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*R
LastState: nil,
Height: 0,
Headers: nil,
Grp: nil,
Grp: grp,
}
err = myDB.ReadDBState() //TODO: Figure out right place for this

View file

@ -12,6 +12,7 @@ import (
dbpkg "github.com/lbryio/herald.go/db"
"github.com/lbryio/herald.go/db/prefixes"
"github.com/lbryio/herald.go/internal"
"github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/linxGnu/grocksdb"
)
@ -93,7 +94,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
LastState: nil,
Height: 0,
Headers: nil,
Grp: nil,
Grp: stop.New(),
}
// err = dbpkg.ReadDBState(myDB) //TODO: Figure out right place for this
@ -242,17 +243,20 @@ func CatCSV(filePath string) {
func TestCatFullDB(t *testing.T) {
t.Skip("Skipping full db test")
grp := stop.New()
// url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c"
// "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1"
// url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
// url := "lbry://@lbry"
// url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a"
dbPath := "/mnt/sda/wallet_server/_data/lbry-rocksdb/"
dbPath := "/mnt/sda1/wallet_server/_data/lbry-rocksdb/"
// dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
secondaryPath := "asdf"
db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath)
db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp)
defer db.Shutdown()
defer toDefer()
// defer toDefer()
db.Cleanup = toDefer
if err != nil {
t.Error(err)
return
@ -272,6 +276,7 @@ func TestCatFullDB(t *testing.T) {
// TestOpenFullDB Tests running a resolve on a full db.
func TestOpenFullDB(t *testing.T) {
t.Skip("Skipping full db test")
grp := stop.New()
// url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c"
// "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1"
// url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
@ -279,11 +284,13 @@ func TestOpenFullDB(t *testing.T) {
// url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a"
// url := "lbry://@lbry$1"
url := "https://lbry.tv/@lothrop:2/lothrop-livestream-games-and-code:c"
dbPath := "/mnt/sda/wallet_server/_data/lbry-rocksdb/"
dbPath := "/mnt/sda1/wallet_server/_data/lbry-rocksdb/"
// dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
secondaryPath := "asdf"
db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath)
defer toDefer()
db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp)
defer db.Shutdown()
// defer toDefer()
db.Cleanup = toDefer
if err != nil {
t.Error(err)
return
@ -298,11 +305,13 @@ func TestResolve(t *testing.T) {
url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
filePath := "../testdata/FULL_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
}
defer toDefer()
// defer toDefer()
db.Cleanup = toDefer
expandedResolveResult := db.Resolve(url)
log.Printf("%#v\n", expandedResolveResult)
if expandedResolveResult != nil && expandedResolveResult.Channel != nil {
@ -317,10 +326,12 @@ func TestGetDBState(t *testing.T) {
filePath := "../testdata/s_resolve.csv"
want := uint32(1072108)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
// defer toDefer()
db.Cleanup = toDefer
state, err := db.GetDBState()
if err != nil {
t.Error(err)
@ -339,10 +350,12 @@ func TestGetRepostedClaim(t *testing.T) {
channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf")
filePath := "../testdata/W_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
// defer toDefer()
db.Cleanup = toDefer
count, err := db.GetRepostedCount(channelHash)
if err != nil {
@ -372,10 +385,12 @@ func TestGetRepostedCount(t *testing.T) {
channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf")
filePath := "../testdata/j_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
// defer toDefer()
db.Cleanup = toDefer
count, err := db.GetRepostedCount(channelHash)
if err != nil {
@ -409,10 +424,12 @@ func TestGetRepost(t *testing.T) {
filePath := "../testdata/V_resolve.csv"
// want := uint32(3670)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
// defer toDefer()
db.Cleanup = toDefer
res, err := db.GetRepost(channelHash)
if err != nil {
@ -443,10 +460,12 @@ func TestGetClaimsInChannelCount(t *testing.T) {
filePath := "../testdata/Z_resolve.csv"
want := uint32(3670)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
// defer toDefer()
db.Cleanup = toDefer
count, err := db.GetClaimsInChannelCount(channelHash)
if err != nil {
t.Error(err)
@ -472,10 +491,12 @@ func TestGetShortClaimIdUrl(t *testing.T) {
filePath := "../testdata/F_resolve.csv"
log.Println(filePath)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
// defer toDefer()
db.Cleanup = toDefer
shortUrl, err := db.GetShortClaimIdUrl(name, normalName, claimHash, rootTxNum, position)
if err != nil {
t.Error(err)
@ -490,10 +511,12 @@ func TestClaimShortIdIter(t *testing.T) {
normalName := "cat"
claimId := "0"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
// defer toDefer()
db.Cleanup = toDefer
ch := db.ClaimShortIdIter(normalName, claimId)
@ -520,10 +543,12 @@ func TestGetTXOToClaim(t *testing.T) {
var position uint16 = 0
filePath := "../testdata/G_2.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
// defer toDefer()
db.Cleanup = toDefer
val, err := db.GetCachedClaimHash(txNum, position)
if err != nil {
t.Error(err)
@ -548,10 +573,12 @@ func TestGetClaimToChannel(t *testing.T) {
filePath := "../testdata/I_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
// defer toDefer()
db.Cleanup = toDefer
val, err = db.GetChannelForClaim(claimHash, txNum, position)
if err != nil {
@ -577,10 +604,12 @@ func TestGetEffectiveAmountSupportOnly(t *testing.T) {
claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a"
claimHash, _ := hex.DecodeString(claimHashStr)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
// defer toDefer()
db.Cleanup = toDefer
db.Height = 999999999
amount, err := db.GetEffectiveAmount(claimHash, true)
@ -607,10 +636,12 @@ func TestGetEffectiveAmount(t *testing.T) {
claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a"
claimHash, _ := hex.DecodeString(claimHashStr)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
// defer toDefer()
db.Cleanup = toDefer
db.Height = 999999999
amount, err := db.GetEffectiveAmount(claimHash, false)
@ -644,10 +675,12 @@ func TestGetSupportAmount(t *testing.T) {
}
filePath := "../testdata/a_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
// defer toDefer()
db.Cleanup = toDefer
res, err := db.GetSupportAmount(claimHash)
if err != nil {
t.Error(err)
@ -664,10 +697,12 @@ func TestGetTxHash(t *testing.T) {
filePath := "../testdata/X_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
// defer toDefer()
db.Cleanup = toDefer
resHash, err := db.GetTxHash(txNum)
if err != nil {
t.Error(err)
@ -706,10 +741,12 @@ func TestGetActivation(t *testing.T) {
position := uint16(0x0)
want := uint32(0xa6b65)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
defer toDefer()
// defer toDefer()
db.Cleanup = toDefer
activation, err := db.GetActivation(txNum, position)
if err != nil {
t.Error(err)
@ -737,11 +774,13 @@ func TestGetClaimToTXO(t *testing.T) {
}
filePath := "../testdata/E_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
}
defer toDefer()
// defer toDefer()
db.Cleanup = toDefer
res, err := db.GetCachedClaimTxo(claimHash, true)
if err != nil {
t.Error(err)
@ -766,11 +805,13 @@ func TestGetControllingClaim(t *testing.T) {
claimHash := "2556ed1cab9d17f2a9392030a9ad7f5d138f11bd"
filePath := "../testdata/P_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
}
defer toDefer()
// defer toDefer()
db.Cleanup = toDefer
res, err := db.GetControllingClaim(claimName)
if err != nil {
t.Error(err)

View file

@ -13,6 +13,7 @@ import (
"github.com/lbryio/lbcd/chaincfg"
"github.com/lbryio/lbcd/txscript"
"github.com/lbryio/lbcutil"
"github.com/lbryio/lbry.go/v3/extras/stop"
)
// Source: test_variety_of_transactions_and_longish_history (lbry-sdk/tests/integration/transactions)
@ -57,7 +58,8 @@ var regTestAddrs = [30]string{
func TestServerGetHeight(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
grp := stop.New()
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer toDefer()
if err != nil {
t.Error(err)
@ -87,7 +89,8 @@ func TestServerGetHeight(t *testing.T) {
func TestGetChunk(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
grp := stop.New()
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer toDefer()
if err != nil {
t.Error(err)
@ -130,7 +133,8 @@ func TestGetChunk(t *testing.T) {
func TestGetHeader(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
grp := stop.New()
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer toDefer()
if err != nil {
t.Error(err)
@ -159,7 +163,8 @@ func TestGetHeader(t *testing.T) {
func TestHeaders(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
grp := stop.New()
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer toDefer()
if err != nil {
t.Error(err)
@ -190,15 +195,19 @@ func TestHeaders(t *testing.T) {
func TestHeadersSubscribe(t *testing.T) {
args := MakeDefaultTestArgs()
grp := stop.New()
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
db, _, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
// defer toDefer()
defer db.Shutdown()
defer db.Grp.Done()
// db.Cleanup = toDefer
if err != nil {
t.Error(err)
return
}
sm := newSessionManager(db, args, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout)
sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams)
sm.start()
defer sm.stop()
@ -210,13 +219,13 @@ func TestHeadersSubscribe(t *testing.T) {
// Set up logic to read a notification.
var received sync.WaitGroup
recv := func(client net.Conn) {
defer received.Done()
buf := make([]byte, 1024)
len, err := client.Read(buf)
if err != nil {
t.Errorf("read err: %v", err)
}
t.Logf("len: %v notification: %v", len, string(buf))
received.Done()
}
received.Add(2)
go recv(client1)
@ -282,7 +291,8 @@ func TestHeadersSubscribe(t *testing.T) {
func TestGetBalance(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
grp := stop.New()
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer toDefer()
if err != nil {
t.Error(err)
@ -311,7 +321,8 @@ func TestGetBalance(t *testing.T) {
func TestGetHistory(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
grp := stop.New()
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer toDefer()
if err != nil {
t.Error(err)
@ -340,7 +351,8 @@ func TestGetHistory(t *testing.T) {
func TestListUnspent(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
grp := stop.New()
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer toDefer()
if err != nil {
t.Error(err)
@ -369,15 +381,16 @@ func TestListUnspent(t *testing.T) {
func TestAddressSubscribe(t *testing.T) {
args := MakeDefaultTestArgs()
grp := stop.New()
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer toDefer()
if err != nil {
t.Error(err)
return
}
sm := newSessionManager(db, args, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout)
sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams)
sm.start()
defer sm.stop()

View file

@ -151,7 +151,7 @@ func (s *Server) Run() {
}
}
func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) {
func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, error) {
tmpName, err := ioutil.TempDir("", "go-lbry-hub")
if err != nil {
logrus.Info(err)
@ -161,7 +161,7 @@ func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) {
if err != nil {
logrus.Info(err)
}
myDB, _, err := db.GetProdDB(args.DBPath, tmpName)
myDB, _, err := db.GetProdDB(args.DBPath, tmpName, grp)
// dbShutdown = func() {
// db.Shutdown(myDB)
// }
@ -270,12 +270,12 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
var myDB *db.ReadOnlyDBColumnFamily
// var dbShutdown = func() {}
if !args.DisableResolve {
myDB, err = LoadDatabase(args)
myDB, err = LoadDatabase(args, grp)
if err != nil {
logrus.Warning(err)
}
myDB.Grp = stop.New(grp)
myDB.Grp.Add(1)
// myDB.Grp = stop.New(grp)
// myDB.Grp.Add(1)
}
// Determine which chain to use based on db and cli values
@ -317,6 +317,8 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
args.GenesisHash = chain.GenesisHash.String()
sessionGrp := stop.New(grp)
s := &Server{
GrpcServer: grpcServer,
Args: args,
@ -341,7 +343,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
HeightSubsMut: sync.RWMutex{},
NotifierChan: make(chan interface{}),
Grp: grp,
sessionManager: newSessionManager(myDB, args, &chain, args.MaxSessions, args.SessionTimeout),
sessionManager: newSessionManager(myDB, args, sessionGrp, &chain),
}
// Start up our background services

View file

@ -14,6 +14,7 @@ import (
"github.com/lbryio/herald.go/db"
"github.com/lbryio/herald.go/internal"
"github.com/lbryio/lbcd/chaincfg"
"github.com/lbryio/lbry.go/v3/extras/stop"
log "github.com/sirupsen/logrus"
)
@ -116,7 +117,8 @@ type sessionManager struct {
// sessionsMut protects sessions, headerSubs, hashXSubs state
sessionsMut sync.RWMutex
sessions sessionMap
sessionsWait sync.WaitGroup
// sessionsWait sync.WaitGroup
grp *stop.Group
sessionsMax int
sessionTimeout time.Duration
manageTicker *time.Ticker
@ -129,12 +131,13 @@ type sessionManager struct {
hashXSubs map[[HASHX_LEN]byte]sessionMap
}
func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, chain *chaincfg.Params, sessionsMax, sessionTimeout int) *sessionManager {
func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
return &sessionManager{
sessions: make(sessionMap),
sessionsMax: sessionsMax,
sessionTimeout: time.Duration(sessionTimeout) * time.Second,
manageTicker: time.NewTicker(time.Duration(max(5, sessionTimeout/20)) * time.Second),
grp: grp,
sessionsMax: args.MaxSessions,
sessionTimeout: time.Duration(args.SessionTimeout) * time.Second,
manageTicker: time.NewTicker(time.Duration(max(5, args.SessionTimeout/20)) * time.Second),
db: db,
args: args,
chain: chain,
@ -170,7 +173,12 @@ func (sm *sessionManager) manage() {
}
sm.sessionsMut.Unlock()
// Wait for next management clock tick.
<-sm.manageTicker.C
select {
case <-sm.grp.Ch():
return
case <-sm.manageTicker.C:
continue
}
}
}
@ -229,11 +237,12 @@ func (sm *sessionManager) addSession(conn net.Conn) *session {
goto fail
}
sm.sessionsWait.Add(1)
sm.grp.Add(1)
go func() {
s1.ServeCodec(&SessionServerCodec{jsonrpc.NewServerCodec(conn), sess})
log.Infof("session %v goroutine exit", sess.addr.String())
sm.sessionsWait.Done()
//sm.sessionsWait.Done()
sm.grp.Done()
}()
return sess