From 7679b962316f7c82e1a162de0b1664ab547e94bc Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Sat, 15 Oct 2022 15:00:58 +0000 Subject: [PATCH] updates and refactors --- db/db.go | 28 +++++-------------------- db/iteroptions.go | 11 ++-------- server/jsonrpc_blockchain_test.go | 6 ++++-- server/jsonrpc_server.go | 10 ++++++--- server/jsonrpc_service.go | 34 ++++++++++++++++++------------- server/server.go | 4 ++-- server/session.go | 13 ++++++++++-- 7 files changed, 51 insertions(+), 55 deletions(-) diff --git a/db/db.go b/db/db.go index 4b2823f..dc9de53 100644 --- a/db/db.go +++ b/db/db.go @@ -341,24 +341,10 @@ func interruptRequested(interrupted <-chan struct{}) bool { func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { ch := make(chan *prefixes.PrefixRowKV) - // iterKey := fmt.Sprintf("%p", opts) - if opts.DB != nil { - // opts.DB.ItMut.Lock() - // // There is a tiny chance that we were wating on the above lock while shutdown was - // // being called and by the time we get it the db has already notified all active - // // iterators to shutdown. In this case we go to the else branch. - // if !opts.DB.ShutdownCalled { - // opts.DB.OpenIterators[iterKey] = []chan struct{}{opts.DoneChan, opts.ShutdownChan} - // opts.DB.ItMut.Unlock() - // } else { - // opts.DB.ItMut.Unlock() - // return ch - // } - if opts.DB.ShutdownCalled && opts.Grp != nil { - // opts.Grp.DoneNamed(iterKey) - opts.Grp.Done() - return ch - } + // Check if we've been told to shutdown in between getting created and getting here + if opts.Grp != nil && interruptRequested(opts.Grp.Ch()) { + opts.Grp.Done() + return ch } ro := grocksdb.NewDefaultReadOptions() @@ -376,13 +362,9 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { it.Close() close(ch) ro.Destroy() - if opts.DB != nil && opts.Grp != nil { + if opts.Grp != nil { // opts.Grp.DoneNamed(iterKey) opts.Grp.Done() - // opts.DoneChan <- struct{}{} - // opts.DB.ItMut.Lock() - // delete(opts.DB.OpenIterators, iterKey) - // opts.DB.ItMut.Unlock() } }() diff --git a/db/iteroptions.go b/db/iteroptions.go index f960a69..c4b2467 100644 --- a/db/iteroptions.go +++ b/db/iteroptions.go @@ -24,9 +24,7 @@ type IterOptions struct { RawKey bool RawValue bool Grp *stop.Group - // ShutdownChan chan struct{} - // DoneChan chan struct{} - DB *ReadOnlyDBColumnFamily + // DB *ReadOnlyDBColumnFamily CfHandle *grocksdb.ColumnFamilyHandle It *grocksdb.Iterator Serializer *prefixes.SerializationAPI @@ -46,9 +44,7 @@ func NewIterateOptions() *IterOptions { RawKey: false, RawValue: false, Grp: nil, - // ShutdownChan: make(chan struct{}, 1), - // DoneChan: make(chan struct{}, 1), - DB: nil, + // DB: nil, CfHandle: nil, It: nil, Serializer: prefixes.ProductionAPI, @@ -111,9 +107,6 @@ func (o *IterOptions) WithRawValue(rawValue bool) *IterOptions { } func (o *IterOptions) WithDB(db *ReadOnlyDBColumnFamily) *IterOptions { - o.DB = db - // o.Grp = stop.NewDebug(db.Grp) - // iterKey := fmt.Sprintf("%p", o) // o.Grp.AddNamed(1, iterKey) o.Grp = stop.New(db.Grp) o.Grp.Add(1) diff --git a/server/jsonrpc_blockchain_test.go b/server/jsonrpc_blockchain_test.go index 2c94ebb..74f5381 100644 --- a/server/jsonrpc_blockchain_test.go +++ b/server/jsonrpc_blockchain_test.go @@ -189,6 +189,7 @@ func TestHeaders(t *testing.T) { } func TestHeadersSubscribe(t *testing.T) { + args := MakeDefaultTestArgs() secondaryPath := "asdf" db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) defer toDefer() @@ -197,7 +198,7 @@ func TestHeadersSubscribe(t *testing.T) { return } - sm := newSessionManager(db, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout) + sm := newSessionManager(db, args, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout) sm.start() defer sm.stop() @@ -367,6 +368,7 @@ func TestListUnspent(t *testing.T) { } func TestAddressSubscribe(t *testing.T) { + args := MakeDefaultTestArgs() secondaryPath := "asdf" db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) defer toDefer() @@ -375,7 +377,7 @@ func TestAddressSubscribe(t *testing.T) { return } - sm := newSessionManager(db, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout) + sm := newSessionManager(db, args, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout) sm.start() defer sm.stop() diff --git a/server/jsonrpc_server.go b/server/jsonrpc_server.go index 104fcac..9f35896 100644 --- a/server/jsonrpc_server.go +++ b/server/jsonrpc_server.go @@ -4,6 +4,10 @@ import ( log "github.com/sirupsen/logrus" ) +type ServerService struct { + Args *Args +} + type ServerFeatureService struct { Args *Args } @@ -26,7 +30,7 @@ type ServerFeaturesRes struct { } // Features is the json rpc endpoint for 'server.features'. -func (t *ServerFeatureService) Features(req *ServerFeaturesReq, res **ServerFeaturesRes) error { +func (t *ServerService) Features(req *ServerFeaturesReq, res **ServerFeaturesRes) error { log.Println("Features") features := &ServerFeaturesRes{ @@ -57,7 +61,7 @@ type ServerBannerReq struct{} type ServerBannerRes string // Banner is the json rpc endpoint for 'server.banner'. -func (t *ServerBannerService) Banner(req *ServerBannerReq, res **ServerBannerRes) error { +func (t *ServerService) Banner(req *ServerBannerReq, res **ServerBannerRes) error { log.Println("Banner") *res = (*ServerBannerRes)(t.Args.Banner) @@ -76,7 +80,7 @@ type ServerVersionRes string // Banner is the json rpc endpoint for 'server.version'. // FIXME: This should return a struct with the version and the protocol version. // <<-- that comment was written by github, scary shit because it's true -func (t *ServerVersionService) Version(req *ServerVersionReq, res **ServerVersionRes) error { +func (t *ServerService) Version(req *ServerVersionReq, res **ServerVersionRes) error { log.Println("Version") *res = (*ServerVersionRes)(&t.Args.ServerVersion) diff --git a/server/jsonrpc_service.go b/server/jsonrpc_service.go index 4955701..b9865cc 100644 --- a/server/jsonrpc_service.go +++ b/server/jsonrpc_service.go @@ -122,26 +122,32 @@ fail1: } // Register "server.{features,banner,version}" handlers. - serverFeatureSvc := &ServerFeatureService{s.Args} - err = s1.RegisterTCPService(serverFeatureSvc, "server_features") + serverSvc := &ServerService{s.Args} + err = s1.RegisterTCPService(serverSvc, "server") if err != nil { log.Errorf("RegisterTCPService: %v\n", err) goto fail2 } + // serverFeatureSvc := &ServerFeatureService{s.Args} + // err = s1.RegisterTCPService(serverFeatureSvc, "server_features") + // if err != nil { + // log.Errorf("RegisterTCPService: %v\n", err) + // goto fail2 + // } - serverBannerSvc := &ServerBannerService{s.Args} - err = s1.RegisterTCPService(serverBannerSvc, "server_banner") - if err != nil { - log.Errorf("RegisterTCPService: %v\n", err) - goto fail2 - } + // serverBannerSvc := &ServerBannerService{s.Args} + // err = s1.RegisterTCPService(serverBannerSvc, "server_banner") + // if err != nil { + // log.Errorf("RegisterTCPService: %v\n", err) + // goto fail2 + // } - serverVersionSvc := &ServerVersionService{s.Args} - err = s1.RegisterTCPService(serverVersionSvc, "server_version") - if err != nil { - log.Errorf("RegisterTCPService: %v\n", err) - goto fail2 - } + // serverVersionSvc := &ServerVersionService{s.Args} + // err = s1.RegisterTCPService(serverVersionSvc, "server_version") + // if err != nil { + // log.Errorf("RegisterTCPService: %v\n", err) + // goto fail2 + // } r := gorilla_mux.NewRouter() r.Handle("/rpc", s1) diff --git a/server/server.go b/server/server.go index c53927a..c2d3588 100644 --- a/server/server.go +++ b/server/server.go @@ -274,7 +274,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server { if err != nil { logrus.Warning(err) } - myDB.Grp = stop.NewDebug(grp) + myDB.Grp = stop.New(grp) } // Determine which chain to use based on db and cli values @@ -340,7 +340,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server { HeightSubsMut: sync.RWMutex{}, NotifierChan: make(chan interface{}), Grp: grp, - sessionManager: newSessionManager(myDB, &chain, args.MaxSessions, args.SessionTimeout), + sessionManager: newSessionManager(myDB, args, &chain, args.MaxSessions, args.SessionTimeout), } // Start up our background services diff --git a/server/session.go b/server/session.go index 9b9fbb7..8ea6505 100644 --- a/server/session.go +++ b/server/session.go @@ -121,6 +121,7 @@ type sessionManager struct { sessionTimeout time.Duration manageTicker *time.Ticker db *db.ReadOnlyDBColumnFamily + args *Args chain *chaincfg.Params // headerSubs are sessions subscribed via 'blockchain.headers.subscribe' headerSubs sessionMap @@ -128,13 +129,14 @@ type sessionManager struct { hashXSubs map[[HASHX_LEN]byte]sessionMap } -func newSessionManager(db *db.ReadOnlyDBColumnFamily, chain *chaincfg.Params, sessionsMax, sessionTimeout int) *sessionManager { +func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, chain *chaincfg.Params, sessionsMax, sessionTimeout int) *sessionManager { return &sessionManager{ sessions: make(sessionMap), sessionsMax: sessionsMax, sessionTimeout: time.Duration(sessionTimeout) * time.Second, manageTicker: time.NewTicker(time.Duration(max(5, sessionTimeout/20)) * time.Second), db: db, + args: args, chain: chain, headerSubs: make(sessionMap), hashXSubs: make(map[[HASHX_LEN]byte]sessionMap), @@ -190,9 +192,16 @@ func (sm *sessionManager) addSession(conn net.Conn) *session { // each request and update subscriptions. s1 := rpc.NewServer() + // Register "server.{features,banner,version}" handlers. + serverSvc := &ServerService{sm.args} + err := s1.RegisterName("server", serverSvc) + if err != nil { + log.Errorf("RegisterTCPService: %v\n", err) + } + // Register "blockchain.claimtrie.*"" handlers. claimtrieSvc := &ClaimtrieService{sm.db} - err := s1.RegisterName("blockchain.claimtrie", claimtrieSvc) + err = s1.RegisterName("blockchain.claimtrie", claimtrieSvc) if err != nil { log.Errorf("RegisterService: %v\n", err) }