diff --git a/db/db.go b/db/db.go index c0aaf1e..4051c50 100644 --- a/db/db.go +++ b/db/db.go @@ -531,8 +531,7 @@ func GetWriteDBCF(name string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHandle, er } // GetProdDB returns a db that is used for production. -func GetProdDB(name string, secondaryPath string, grp *stop.Group) (*ReadOnlyDBColumnFamily, func(), error) { - grp.Add(1) +func GetProdDB(name string, secondaryPath string, grp *stop.Group) (*ReadOnlyDBColumnFamily, error) { prefixNames := prefixes.GetPrefixes() // additional prefixes that aren't in the code explicitly cfNames := []string{"default", "e", "d", "c"} @@ -551,7 +550,8 @@ func GetProdDB(name string, secondaryPath string, grp *stop.Group) (*ReadOnlyDBC } if err != nil { - return nil, cleanupFiles, err + cleanupFiles() + return nil, err } cleanupDB := func() { @@ -560,7 +560,7 @@ func GetProdDB(name string, secondaryPath string, grp *stop.Group) (*ReadOnlyDBC } db.Cleanup = cleanupDB - return db, cleanupDB, nil + return db, nil } // GetDBColumnFamilies gets a db with the specified column families and secondary path. @@ -679,6 +679,7 @@ func (db *ReadOnlyDBColumnFamily) Shutdown() { // to keep the db readonly view up to date and handle reorgs on the // blockchain. func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) { + db.Grp.Add(1) go func() { lastPrint := time.Now() for { diff --git a/db/db_test.go b/db/db_test.go index 792b62a..e7114eb 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -21,7 +21,7 @@ import ( //////////////////////////////////////////////////////////////////////////////// // OpenAndFillTmpDBColumnFamlies opens a db and fills it with data from a csv file using the given column family names -func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFamily, [][]string, func(), error) { +func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFamily, [][]string, error) { log.Println(filePath) file, err := os.Open(filePath) @@ -31,7 +31,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami reader := csv.NewReader(file) records, err := reader.ReadAll() if err != nil { - return nil, nil, nil, err + return nil, nil, err } wOpts := grocksdb.NewDefaultWriteOptions() @@ -39,7 +39,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami opts.SetCreateIfMissing(true) db, err := grocksdb.OpenDb(opts, "tmp") if err != nil { - return nil, nil, nil, err + return nil, nil, err } var handleMap map[string]*grocksdb.ColumnFamilyHandle = make(map[string]*grocksdb.ColumnFamilyHandle) @@ -54,7 +54,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami log.Println(cfName) handle, err := db.CreateColumnFamily(opts, cfName) if err != nil { - return nil, nil, nil, err + return nil, nil, err } handleMap[cfName] = handle } @@ -68,16 +68,16 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami for _, record := range records[1:] { cf := record[0] if err != nil { - return nil, nil, nil, err + return nil, nil, err } handle := handleMap[string(cf)] key, err := hex.DecodeString(record[1]) if err != nil { - return nil, nil, nil, err + return nil, nil, err } val, err := hex.DecodeString(record[2]) if err != nil { - return nil, nil, nil, err + return nil, nil, err } db.PutCF(wOpts, handle, key, val) } @@ -95,6 +95,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami Height: 0, Headers: nil, Grp: stop.New(), + Cleanup: toDefer, } // err = dbpkg.ReadDBState(myDB) //TODO: Figure out right place for this @@ -104,7 +105,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami err = myDB.InitTxCounts() if err != nil { - return nil, nil, nil, err + return nil, nil, err } // err = dbpkg.InitHeaders(myDB) @@ -112,7 +113,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami // return nil, nil, nil, err // } - return myDB, records, toDefer, nil + return myDB, records, nil } // OpenAndFillTmpDBCF opens a db and fills it with data from a csv file @@ -252,11 +253,9 @@ func TestCatFullDB(t *testing.T) { dbPath := "/mnt/sda1/wallet_server/_data/lbry-rocksdb/" // dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" secondaryPath := "asdf" - db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp) + db, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp) defer db.Shutdown() - // defer toDefer() - db.Cleanup = toDefer if err != nil { t.Error(err) return @@ -287,10 +286,8 @@ func TestOpenFullDB(t *testing.T) { dbPath := "/mnt/sda1/wallet_server/_data/lbry-rocksdb/" // dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" secondaryPath := "asdf" - db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp) + db, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp) defer db.Shutdown() - // defer toDefer() - db.Cleanup = toDefer if err != nil { t.Error(err) return @@ -304,14 +301,13 @@ func TestOpenFullDB(t *testing.T) { func TestResolve(t *testing.T) { url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9" filePath := "../testdata/FULL_resolve.csv" - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) defer db.Shutdown() if err != nil { t.Error(err) return } - // defer toDefer() - db.Cleanup = toDefer + expandedResolveResult := db.Resolve(url) log.Printf("%#v\n", expandedResolveResult) if expandedResolveResult != nil && expandedResolveResult.Channel != nil { @@ -325,13 +321,11 @@ func TestResolve(t *testing.T) { func TestGetDBState(t *testing.T) { filePath := "../testdata/s_resolve.csv" want := uint32(1072108) - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) defer db.Shutdown() if err != nil { t.Error(err) } - // defer toDefer() - db.Cleanup = toDefer state, err := db.GetDBState() if err != nil { t.Error(err) @@ -349,13 +343,11 @@ func TestGetRepostedClaim(t *testing.T) { // Should be non-existent channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf") filePath := "../testdata/W_resolve.csv" - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) defer db.Shutdown() if err != nil { t.Error(err) } - // defer toDefer() - db.Cleanup = toDefer count, err := db.GetRepostedCount(channelHash) if err != nil { @@ -384,13 +376,11 @@ func TestGetRepostedCount(t *testing.T) { // Should be non-existent channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf") filePath := "../testdata/j_resolve.csv" - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) defer db.Shutdown() if err != nil { t.Error(err) } - // defer toDefer() - db.Cleanup = toDefer count, err := db.GetRepostedCount(channelHash) if err != nil { @@ -423,13 +413,11 @@ func TestGetRepost(t *testing.T) { channelHash2, _ := hex.DecodeString("000009ca6e0caaaef16872b4bd4f6f1b8c2363e2") filePath := "../testdata/V_resolve.csv" // want := uint32(3670) - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) defer db.Shutdown() if err != nil { t.Error(err) } - // defer toDefer() - db.Cleanup = toDefer res, err := db.GetRepost(channelHash) if err != nil { @@ -459,13 +447,12 @@ func TestGetClaimsInChannelCount(t *testing.T) { channelHash, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bd") filePath := "../testdata/Z_resolve.csv" want := uint32(3670) - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) defer db.Shutdown() if err != nil { t.Error(err) } - // defer toDefer() - db.Cleanup = toDefer + count, err := db.GetClaimsInChannelCount(channelHash) if err != nil { t.Error(err) @@ -490,13 +477,12 @@ func TestGetShortClaimIdUrl(t *testing.T) { var position uint16 = 0 filePath := "../testdata/F_resolve.csv" log.Println(filePath) - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) defer db.Shutdown() if err != nil { t.Error(err) } - // defer toDefer() - db.Cleanup = toDefer + shortUrl, err := db.GetShortClaimIdUrl(name, normalName, claimHash, rootTxNum, position) if err != nil { t.Error(err) @@ -510,13 +496,11 @@ func TestClaimShortIdIter(t *testing.T) { filePath := "../testdata/F_cat.csv" normalName := "cat" claimId := "0" - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) defer db.Shutdown() if err != nil { t.Error(err) } - // defer toDefer() - db.Cleanup = toDefer ch := db.ClaimShortIdIter(normalName, claimId) @@ -542,13 +526,12 @@ func TestGetTXOToClaim(t *testing.T) { var txNum uint32 = 1456296 var position uint16 = 0 filePath := "../testdata/G_2.csv" - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) defer db.Shutdown() if err != nil { t.Error(err) } - // defer toDefer() - db.Cleanup = toDefer + val, err := db.GetCachedClaimHash(txNum, position) if err != nil { t.Error(err) @@ -572,13 +555,11 @@ func TestGetClaimToChannel(t *testing.T) { var val []byte = nil filePath := "../testdata/I_resolve.csv" - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) defer db.Shutdown() if err != nil { t.Error(err) } - // defer toDefer() - db.Cleanup = toDefer val, err = db.GetChannelForClaim(claimHash, txNum, position) if err != nil { @@ -603,13 +584,11 @@ func TestGetEffectiveAmountSupportOnly(t *testing.T) { want := uint64(20000006) claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a" claimHash, _ := hex.DecodeString(claimHashStr) - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) defer db.Shutdown() if err != nil { t.Error(err) } - // defer toDefer() - db.Cleanup = toDefer db.Height = 999999999 amount, err := db.GetEffectiveAmount(claimHash, true) @@ -635,13 +614,12 @@ func TestGetEffectiveAmount(t *testing.T) { want := uint64(21000006) claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a" claimHash, _ := hex.DecodeString(claimHashStr) - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) defer db.Shutdown() if err != nil { t.Error(err) } - // defer toDefer() - db.Cleanup = toDefer + db.Height = 999999999 amount, err := db.GetEffectiveAmount(claimHash, false) @@ -674,13 +652,12 @@ func TestGetSupportAmount(t *testing.T) { t.Error(err) } filePath := "../testdata/a_resolve.csv" - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) defer db.Shutdown() if err != nil { t.Error(err) } - // defer toDefer() - db.Cleanup = toDefer + res, err := db.GetSupportAmount(claimHash) if err != nil { t.Error(err) @@ -696,13 +673,12 @@ func TestGetTxHash(t *testing.T) { want := "54e14ff0c404c29b3d39ae4d249435f167d5cd4ce5a428ecb745b3df1c8e3dde" filePath := "../testdata/X_resolve.csv" - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) defer db.Shutdown() if err != nil { t.Error(err) } - // defer toDefer() - db.Cleanup = toDefer + resHash, err := db.GetTxHash(txNum) if err != nil { t.Error(err) @@ -740,13 +716,12 @@ func TestGetActivation(t *testing.T) { txNum := uint32(0x6284e3) position := uint16(0x0) want := uint32(0xa6b65) - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) defer db.Shutdown() if err != nil { t.Error(err) } - // defer toDefer() - db.Cleanup = toDefer + activation, err := db.GetActivation(txNum, position) if err != nil { t.Error(err) @@ -773,14 +748,13 @@ func TestGetClaimToTXO(t *testing.T) { return } filePath := "../testdata/E_resolve.csv" - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) defer db.Shutdown() if err != nil { t.Error(err) return } - // defer toDefer() - db.Cleanup = toDefer + res, err := db.GetCachedClaimTxo(claimHash, true) if err != nil { t.Error(err) @@ -804,14 +778,13 @@ func TestGetControllingClaim(t *testing.T) { claimName := internal.NormalizeName("@Styxhexenhammer666") claimHash := "2556ed1cab9d17f2a9392030a9ad7f5d138f11bd" filePath := "../testdata/P_resolve.csv" - db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + db, _, err := OpenAndFillTmpDBColumnFamlies(filePath) defer db.Shutdown() if err != nil { t.Error(err) return } - // defer toDefer() - db.Cleanup = toDefer + res, err := db.GetControllingClaim(claimName) if err != nil { t.Error(err) diff --git a/server/jsonrpc_blockchain_test.go b/server/jsonrpc_blockchain_test.go index e9d870a..16bb9dd 100644 --- a/server/jsonrpc_blockchain_test.go +++ b/server/jsonrpc_blockchain_test.go @@ -59,8 +59,8 @@ var regTestAddrs = [30]string{ func TestServerGetHeight(t *testing.T) { secondaryPath := "asdf" grp := stop.New() - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) - defer toDefer() + db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) + defer db.Shutdown() if err != nil { t.Error(err) return @@ -90,8 +90,8 @@ func TestServerGetHeight(t *testing.T) { func TestGetChunk(t *testing.T) { secondaryPath := "asdf" grp := stop.New() - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) - defer toDefer() + db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) + defer db.Shutdown() if err != nil { t.Error(err) return @@ -134,8 +134,8 @@ func TestGetChunk(t *testing.T) { func TestGetHeader(t *testing.T) { secondaryPath := "asdf" grp := stop.New() - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) - defer toDefer() + db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) + defer db.Shutdown() if err != nil { t.Error(err) return @@ -164,8 +164,8 @@ func TestGetHeader(t *testing.T) { func TestHeaders(t *testing.T) { secondaryPath := "asdf" grp := stop.New() - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) - defer toDefer() + db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) + defer db.Shutdown() if err != nil { t.Error(err) return @@ -197,11 +197,8 @@ func TestHeadersSubscribe(t *testing.T) { args := MakeDefaultTestArgs() grp := stop.New() secondaryPath := "asdf" - db, _, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) - // defer toDefer() + db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) defer db.Shutdown() - defer db.Grp.Done() - // db.Cleanup = toDefer if err != nil { t.Error(err) return @@ -292,8 +289,8 @@ func TestHeadersSubscribe(t *testing.T) { func TestGetBalance(t *testing.T) { secondaryPath := "asdf" grp := stop.New() - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) - defer toDefer() + db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) + defer db.Shutdown() if err != nil { t.Error(err) return @@ -322,8 +319,8 @@ func TestGetBalance(t *testing.T) { func TestGetHistory(t *testing.T) { secondaryPath := "asdf" grp := stop.New() - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) - defer toDefer() + db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) + defer db.Shutdown() if err != nil { t.Error(err) return @@ -352,8 +349,8 @@ func TestGetHistory(t *testing.T) { func TestListUnspent(t *testing.T) { secondaryPath := "asdf" grp := stop.New() - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) - defer toDefer() + db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) + defer db.Shutdown() if err != nil { t.Error(err) return @@ -383,8 +380,8 @@ func TestAddressSubscribe(t *testing.T) { args := MakeDefaultTestArgs() grp := stop.New() secondaryPath := "asdf" - db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) - defer toDefer() + db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp) + defer db.Shutdown() if err != nil { t.Error(err) return diff --git a/server/jsonrpc_service.go b/server/jsonrpc_service.go index b9865cc..ae0d757 100644 --- a/server/jsonrpc_service.go +++ b/server/jsonrpc_service.go @@ -128,26 +128,6 @@ fail1: log.Errorf("RegisterTCPService: %v\n", err) goto fail2 } - // serverFeatureSvc := &ServerFeatureService{s.Args} - // err = s1.RegisterTCPService(serverFeatureSvc, "server_features") - // if err != nil { - // log.Errorf("RegisterTCPService: %v\n", err) - // goto fail2 - // } - - // serverBannerSvc := &ServerBannerService{s.Args} - // err = s1.RegisterTCPService(serverBannerSvc, "server_banner") - // if err != nil { - // log.Errorf("RegisterTCPService: %v\n", err) - // goto fail2 - // } - - // serverVersionSvc := &ServerVersionService{s.Args} - // err = s1.RegisterTCPService(serverVersionSvc, "server_version") - // if err != nil { - // log.Errorf("RegisterTCPService: %v\n", err) - // goto fail2 - // } r := gorilla_mux.NewRouter() r.Handle("/rpc", s1) diff --git a/server/server.go b/server/server.go index ce4060e..44f4bdf 100644 --- a/server/server.go +++ b/server/server.go @@ -161,10 +161,7 @@ func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, erro if err != nil { logrus.Info(err) } - myDB, _, err := db.GetProdDB(args.DBPath, tmpName, grp) - // dbShutdown = func() { - // db.Shutdown(myDB) - // } + myDB, err := db.GetProdDB(args.DBPath, tmpName, grp) if err != nil { // Can't load the db, fail loudly logrus.Info(err) diff --git a/server/session.go b/server/session.go index 98f3f8c..5caf06b 100644 --- a/server/session.go +++ b/server/session.go @@ -206,14 +206,14 @@ func (sm *sessionManager) addSession(conn net.Conn) *session { serverSvc := &ServerService{sm.args} err := s1.RegisterName("server", serverSvc) if err != nil { - log.Errorf("RegisterTCPService: %v\n", err) + log.Errorf("RegisterName: %v\n", err) } // Register "blockchain.claimtrie.*"" handlers. claimtrieSvc := &ClaimtrieService{sm.db} err = s1.RegisterName("blockchain.claimtrie", claimtrieSvc) if err != nil { - log.Errorf("RegisterService: %v\n", err) + log.Errorf("RegisterName: %v\n", err) } // Register other "blockchain.{block,address,scripthash}.*" handlers.