diff --git a/db/db.go b/db/db.go index 1df1eb9..c0aaf1e 100644 --- a/db/db.go +++ b/db/db.go @@ -531,7 +531,8 @@ func GetWriteDBCF(name string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHandle, er } // GetProdDB returns a db that is used for production. -func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func(), error) { +func GetProdDB(name string, secondaryPath string, grp *stop.Group) (*ReadOnlyDBColumnFamily, func(), error) { + grp.Add(1) prefixNames := prefixes.GetPrefixes() // additional prefixes that aren't in the code explicitly cfNames := []string{"default", "e", "d", "c"} @@ -540,7 +541,7 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func cfNames = append(cfNames, cfName) } - db, err := GetDBColumnFamilies(name, secondaryPath, cfNames) + db, err := GetDBColumnFamilies(name, secondaryPath, cfNames, grp) cleanupFiles := func() { err = os.RemoveAll(secondaryPath) @@ -563,7 +564,7 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func } // GetDBColumnFamilies gets a db with the specified column families and secondary path. -func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*ReadOnlyDBColumnFamily, error) { +func GetDBColumnFamilies(name string, secondayPath string, cfNames []string, grp *stop.Group) (*ReadOnlyDBColumnFamily, error) { opts := grocksdb.NewDefaultOptions() roOpts := grocksdb.NewDefaultReadOptions() cfOpt := grocksdb.NewDefaultOptions() @@ -599,7 +600,7 @@ func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*R LastState: nil, Height: 0, Headers: nil, - Grp: nil, + Grp: grp, } err = myDB.ReadDBState() //TODO: Figure out right place for this diff --git a/db/db_test.go b/db/db_test.go index 7bfd19d..792b62a 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -12,6 +12,7 @@ import ( dbpkg "github.com/lbryio/herald.go/db" "github.com/lbryio/herald.go/db/prefixes" "github.com/lbryio/herald.go/internal" + "github.com/lbryio/lbry.go/v3/extras/stop" "github.com/linxGnu/grocksdb" ) @@ -93,7 +94,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami LastState: nil, Height: 0, Headers: nil, - Grp: nil, + Grp: stop.New(), } // err = dbpkg.ReadDBState(myDB) //TODO: Figure out right place for this @@ -242,17 +243,20 @@ func CatCSV(filePath string) { func TestCatFullDB(t *testing.T) { t.Skip("Skipping full db test") + grp := stop.New() // url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c" // "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1" // url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9" // url := "lbry://@lbry" // url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a" - dbPath := "/mnt/sda/wallet_server/_data/lbry-rocksdb/" + dbPath := "/mnt/sda1/wallet_server/_data/lbry-rocksdb/" // dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" secondaryPath := "asdf" - db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath) + db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp) + defer db.Shutdown() - defer toDefer() + // defer toDefer() + db.Cleanup = toDefer if err != nil { t.Error(err) return @@ -272,6 +276,7 @@ func TestCatFullDB(t *testing.T) { // TestOpenFullDB Tests running a resolve on a full db. func TestOpenFullDB(t *testing.T) { t.Skip("Skipping full db test") + grp := stop.New() // url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c" // "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1" // url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9" @@ -279,11 +284,13 @@ func TestOpenFullDB(t *testing.T) { // url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a" // url := "lbry://@lbry$1" url := "https://lbry.tv/@lothrop:2/lothrop-livestream-games-and-code:c" - dbPath := "/mnt/sda/wallet_server/_data/lbry-rocksdb/" + dbPath := "/mnt/sda1/wallet_server/_data/lbry-rocksdb/" // dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" secondaryPath := "asdf" - db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath) - defer toDefer() + db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp) + defer db.Shutdown() + // defer toDefer() + db.Cleanup = toDefer if err != nil { t.Error(err) return @@ -298,11 +305,13 @@ func TestResolve(t *testing.T) { url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9" filePath := "../testdata/FULL_resolve.csv" db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) return } - defer toDefer() + // defer toDefer() + db.Cleanup = toDefer expandedResolveResult := db.Resolve(url) log.Printf("%#v\n", expandedResolveResult) if expandedResolveResult != nil && expandedResolveResult.Channel != nil { @@ -317,10 +326,12 @@ func TestGetDBState(t *testing.T) { filePath := "../testdata/s_resolve.csv" want := uint32(1072108) db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() + // defer toDefer() + db.Cleanup = toDefer state, err := db.GetDBState() if err != nil { t.Error(err) @@ -339,10 +350,12 @@ func TestGetRepostedClaim(t *testing.T) { channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf") filePath := "../testdata/W_resolve.csv" db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() + // defer toDefer() + db.Cleanup = toDefer count, err := db.GetRepostedCount(channelHash) if err != nil { @@ -372,10 +385,12 @@ func TestGetRepostedCount(t *testing.T) { channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf") filePath := "../testdata/j_resolve.csv" db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() + // defer toDefer() + db.Cleanup = toDefer count, err := db.GetRepostedCount(channelHash) if err != nil { @@ -409,10 +424,12 @@ func TestGetRepost(t *testing.T) { filePath := "../testdata/V_resolve.csv" // want := uint32(3670) db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() + // defer toDefer() + db.Cleanup = toDefer res, err := db.GetRepost(channelHash) if err != nil { @@ -443,10 +460,12 @@ func TestGetClaimsInChannelCount(t *testing.T) { filePath := "../testdata/Z_resolve.csv" want := uint32(3670) db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() + // defer toDefer() + db.Cleanup = toDefer count, err := db.GetClaimsInChannelCount(channelHash) if err != nil { t.Error(err) @@ -472,10 +491,12 @@ func TestGetShortClaimIdUrl(t *testing.T) { filePath := "../testdata/F_resolve.csv" log.Println(filePath) db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() + // defer toDefer() + db.Cleanup = toDefer shortUrl, err := db.GetShortClaimIdUrl(name, normalName, claimHash, rootTxNum, position) if err != nil { t.Error(err) @@ -490,10 +511,12 @@ func TestClaimShortIdIter(t *testing.T) { normalName := "cat" claimId := "0" db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() + // defer toDefer() + db.Cleanup = toDefer ch := db.ClaimShortIdIter(normalName, claimId) @@ -520,10 +543,12 @@ func TestGetTXOToClaim(t *testing.T) { var position uint16 = 0 filePath := "../testdata/G_2.csv" db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() + // defer toDefer() + db.Cleanup = toDefer val, err := db.GetCachedClaimHash(txNum, position) if err != nil { t.Error(err) @@ -548,10 +573,12 @@ func TestGetClaimToChannel(t *testing.T) { filePath := "../testdata/I_resolve.csv" db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() + // defer toDefer() + db.Cleanup = toDefer val, err = db.GetChannelForClaim(claimHash, txNum, position) if err != nil { @@ -577,10 +604,12 @@ func TestGetEffectiveAmountSupportOnly(t *testing.T) { claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a" claimHash, _ := hex.DecodeString(claimHashStr) db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() + // defer toDefer() + db.Cleanup = toDefer db.Height = 999999999 amount, err := db.GetEffectiveAmount(claimHash, true) @@ -607,10 +636,12 @@ func TestGetEffectiveAmount(t *testing.T) { claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a" claimHash, _ := hex.DecodeString(claimHashStr) db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() + // defer toDefer() + db.Cleanup = toDefer db.Height = 999999999 amount, err := db.GetEffectiveAmount(claimHash, false) @@ -644,10 +675,12 @@ func TestGetSupportAmount(t *testing.T) { } filePath := "../testdata/a_resolve.csv" db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() + // defer toDefer() + db.Cleanup = toDefer res, err := db.GetSupportAmount(claimHash) if err != nil { t.Error(err) @@ -664,10 +697,12 @@ func TestGetTxHash(t *testing.T) { filePath := "../testdata/X_resolve.csv" db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() + // defer toDefer() + db.Cleanup = toDefer resHash, err := db.GetTxHash(txNum) if err != nil { t.Error(err) @@ -706,10 +741,12 @@ func TestGetActivation(t *testing.T) { position := uint16(0x0) want := uint32(0xa6b65) db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) } - defer toDefer() + // defer toDefer() + db.Cleanup = toDefer activation, err := db.GetActivation(txNum, position) if err != nil { t.Error(err) @@ -737,11 +774,13 @@ func TestGetClaimToTXO(t *testing.T) { } filePath := "../testdata/E_resolve.csv" db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) return } - defer toDefer() + // defer toDefer() + db.Cleanup = toDefer res, err := db.GetCachedClaimTxo(claimHash, true) if err != nil { t.Error(err) @@ -766,11 +805,13 @@ func TestGetControllingClaim(t *testing.T) { claimHash := "2556ed1cab9d17f2a9392030a9ad7f5d138f11bd" filePath := "../testdata/P_resolve.csv" db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + defer db.Shutdown() if err != nil { t.Error(err) return } - defer toDefer() + // defer toDefer() + db.Cleanup = toDefer res, err := db.GetControllingClaim(claimName) if err != nil { t.Error(err) diff --git a/server/jsonrpc_blockchain_test.go b/server/jsonrpc_blockchain_test.go index 74f5381..e9d870a 100644 --- a/server/jsonrpc_blockchain_test.go +++ b/server/jsonrpc_blockchain_test.go @@ -13,6 +13,7 @@ import ( "github.com/lbryio/lbcd/chaincfg" "github.com/lbryio/lbcd/txscript" "github.com/lbryio/lbcutil" + "github.com/lbryio/lbry.go/v3/extras/stop" ) // Source: test_variety_of_transactions_and_longish_history (lbry-sdk/tests/integration/transactions) @@ -57,7 +58,8 @@ var regTestAddrs = [30]string{ func TestServerGetHeight(t *testing.T) { secondaryPath := "asdf" - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) + grp := stop.New() + db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) defer toDefer() if err != nil { t.Error(err) @@ -87,7 +89,8 @@ func TestServerGetHeight(t *testing.T) { func TestGetChunk(t *testing.T) { secondaryPath := "asdf" - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) + grp := stop.New() + db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) defer toDefer() if err != nil { t.Error(err) @@ -130,7 +133,8 @@ func TestGetChunk(t *testing.T) { func TestGetHeader(t *testing.T) { secondaryPath := "asdf" - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) + grp := stop.New() + db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) defer toDefer() if err != nil { t.Error(err) @@ -159,7 +163,8 @@ func TestGetHeader(t *testing.T) { func TestHeaders(t *testing.T) { secondaryPath := "asdf" - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) + grp := stop.New() + db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) defer toDefer() if err != nil { t.Error(err) @@ -190,15 +195,19 @@ func TestHeaders(t *testing.T) { func TestHeadersSubscribe(t *testing.T) { args := MakeDefaultTestArgs() + grp := stop.New() secondaryPath := "asdf" - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) - defer toDefer() + db, _, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) + // defer toDefer() + defer db.Shutdown() + defer db.Grp.Done() + // db.Cleanup = toDefer if err != nil { t.Error(err) return } - sm := newSessionManager(db, args, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout) + sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams) sm.start() defer sm.stop() @@ -210,13 +219,13 @@ func TestHeadersSubscribe(t *testing.T) { // Set up logic to read a notification. var received sync.WaitGroup recv := func(client net.Conn) { + defer received.Done() buf := make([]byte, 1024) len, err := client.Read(buf) if err != nil { t.Errorf("read err: %v", err) } t.Logf("len: %v notification: %v", len, string(buf)) - received.Done() } received.Add(2) go recv(client1) @@ -282,7 +291,8 @@ func TestHeadersSubscribe(t *testing.T) { func TestGetBalance(t *testing.T) { secondaryPath := "asdf" - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) + grp := stop.New() + db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) defer toDefer() if err != nil { t.Error(err) @@ -311,7 +321,8 @@ func TestGetBalance(t *testing.T) { func TestGetHistory(t *testing.T) { secondaryPath := "asdf" - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) + grp := stop.New() + db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) defer toDefer() if err != nil { t.Error(err) @@ -340,7 +351,8 @@ func TestGetHistory(t *testing.T) { func TestListUnspent(t *testing.T) { secondaryPath := "asdf" - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) + grp := stop.New() + db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) defer toDefer() if err != nil { t.Error(err) @@ -369,15 +381,16 @@ func TestListUnspent(t *testing.T) { func TestAddressSubscribe(t *testing.T) { args := MakeDefaultTestArgs() + grp := stop.New() secondaryPath := "asdf" - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) + db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) defer toDefer() if err != nil { t.Error(err) return } - sm := newSessionManager(db, args, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout) + sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams) sm.start() defer sm.stop() diff --git a/server/server.go b/server/server.go index 46579e5..8920ef9 100644 --- a/server/server.go +++ b/server/server.go @@ -151,7 +151,7 @@ func (s *Server) Run() { } } -func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) { +func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, error) { tmpName, err := ioutil.TempDir("", "go-lbry-hub") if err != nil { logrus.Info(err) @@ -161,7 +161,7 @@ func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) { if err != nil { logrus.Info(err) } - myDB, _, err := db.GetProdDB(args.DBPath, tmpName) + myDB, _, err := db.GetProdDB(args.DBPath, tmpName, grp) // dbShutdown = func() { // db.Shutdown(myDB) // } @@ -270,12 +270,12 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server { var myDB *db.ReadOnlyDBColumnFamily // var dbShutdown = func() {} if !args.DisableResolve { - myDB, err = LoadDatabase(args) + myDB, err = LoadDatabase(args, grp) if err != nil { logrus.Warning(err) } - myDB.Grp = stop.New(grp) - myDB.Grp.Add(1) + // myDB.Grp = stop.New(grp) + // myDB.Grp.Add(1) } // Determine which chain to use based on db and cli values @@ -317,6 +317,8 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server { args.GenesisHash = chain.GenesisHash.String() + sessionGrp := stop.New(grp) + s := &Server{ GrpcServer: grpcServer, Args: args, @@ -341,7 +343,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server { HeightSubsMut: sync.RWMutex{}, NotifierChan: make(chan interface{}), Grp: grp, - sessionManager: newSessionManager(myDB, args, &chain, args.MaxSessions, args.SessionTimeout), + sessionManager: newSessionManager(myDB, args, sessionGrp, &chain), } // Start up our background services diff --git a/server/session.go b/server/session.go index 8ea6505..a6eee65 100644 --- a/server/session.go +++ b/server/session.go @@ -14,6 +14,7 @@ import ( "github.com/lbryio/herald.go/db" "github.com/lbryio/herald.go/internal" "github.com/lbryio/lbcd/chaincfg" + "github.com/lbryio/lbry.go/v3/extras/stop" log "github.com/sirupsen/logrus" ) @@ -114,9 +115,10 @@ type sessionMap map[uintptr]*session type sessionManager struct { // sessionsMut protects sessions, headerSubs, hashXSubs state - sessionsMut sync.RWMutex - sessions sessionMap - sessionsWait sync.WaitGroup + sessionsMut sync.RWMutex + sessions sessionMap + // sessionsWait sync.WaitGroup + grp *stop.Group sessionsMax int sessionTimeout time.Duration manageTicker *time.Ticker @@ -129,12 +131,13 @@ type sessionManager struct { hashXSubs map[[HASHX_LEN]byte]sessionMap } -func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, chain *chaincfg.Params, sessionsMax, sessionTimeout int) *sessionManager { +func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager { return &sessionManager{ sessions: make(sessionMap), - sessionsMax: sessionsMax, - sessionTimeout: time.Duration(sessionTimeout) * time.Second, - manageTicker: time.NewTicker(time.Duration(max(5, sessionTimeout/20)) * time.Second), + grp: grp, + sessionsMax: args.MaxSessions, + sessionTimeout: time.Duration(args.SessionTimeout) * time.Second, + manageTicker: time.NewTicker(time.Duration(max(5, args.SessionTimeout/20)) * time.Second), db: db, args: args, chain: chain, @@ -170,7 +173,12 @@ func (sm *sessionManager) manage() { } sm.sessionsMut.Unlock() // Wait for next management clock tick. - <-sm.manageTicker.C + select { + case <-sm.grp.Ch(): + return + case <-sm.manageTicker.C: + continue + } } } @@ -229,11 +237,12 @@ func (sm *sessionManager) addSession(conn net.Conn) *session { goto fail } - sm.sessionsWait.Add(1) + sm.grp.Add(1) go func() { s1.ServeCodec(&SessionServerCodec{jsonrpc.NewServerCodec(conn), sess}) log.Infof("session %v goroutine exit", sess.addr.String()) - sm.sessionsWait.Done() + //sm.sessionsWait.Done() + sm.grp.Done() }() return sess