updates and refactors

This commit is contained in:
Jeffrey Picard 2022-10-15 15:00:58 +00:00
parent 77fee923cb
commit 7679b96231
7 changed files with 51 additions and 55 deletions

View file

@ -341,25 +341,11 @@ func interruptRequested(interrupted <-chan struct{}) bool {
func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
ch := make(chan *prefixes.PrefixRowKV) ch := make(chan *prefixes.PrefixRowKV)
// iterKey := fmt.Sprintf("%p", opts) // Check if we've been told to shutdown in between getting created and getting here
if opts.DB != nil { if opts.Grp != nil && interruptRequested(opts.Grp.Ch()) {
// opts.DB.ItMut.Lock()
// // There is a tiny chance that we were wating on the above lock while shutdown was
// // being called and by the time we get it the db has already notified all active
// // iterators to shutdown. In this case we go to the else branch.
// if !opts.DB.ShutdownCalled {
// opts.DB.OpenIterators[iterKey] = []chan struct{}{opts.DoneChan, opts.ShutdownChan}
// opts.DB.ItMut.Unlock()
// } else {
// opts.DB.ItMut.Unlock()
// return ch
// }
if opts.DB.ShutdownCalled && opts.Grp != nil {
// opts.Grp.DoneNamed(iterKey)
opts.Grp.Done() opts.Grp.Done()
return ch return ch
} }
}
ro := grocksdb.NewDefaultReadOptions() ro := grocksdb.NewDefaultReadOptions()
ro.SetFillCache(opts.FillCache) ro.SetFillCache(opts.FillCache)
@ -376,13 +362,9 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
it.Close() it.Close()
close(ch) close(ch)
ro.Destroy() ro.Destroy()
if opts.DB != nil && opts.Grp != nil { if opts.Grp != nil {
// opts.Grp.DoneNamed(iterKey) // opts.Grp.DoneNamed(iterKey)
opts.Grp.Done() opts.Grp.Done()
// opts.DoneChan <- struct{}{}
// opts.DB.ItMut.Lock()
// delete(opts.DB.OpenIterators, iterKey)
// opts.DB.ItMut.Unlock()
} }
}() }()

View file

@ -24,9 +24,7 @@ type IterOptions struct {
RawKey bool RawKey bool
RawValue bool RawValue bool
Grp *stop.Group Grp *stop.Group
// ShutdownChan chan struct{} // DB *ReadOnlyDBColumnFamily
// DoneChan chan struct{}
DB *ReadOnlyDBColumnFamily
CfHandle *grocksdb.ColumnFamilyHandle CfHandle *grocksdb.ColumnFamilyHandle
It *grocksdb.Iterator It *grocksdb.Iterator
Serializer *prefixes.SerializationAPI Serializer *prefixes.SerializationAPI
@ -46,9 +44,7 @@ func NewIterateOptions() *IterOptions {
RawKey: false, RawKey: false,
RawValue: false, RawValue: false,
Grp: nil, Grp: nil,
// ShutdownChan: make(chan struct{}, 1), // DB: nil,
// DoneChan: make(chan struct{}, 1),
DB: nil,
CfHandle: nil, CfHandle: nil,
It: nil, It: nil,
Serializer: prefixes.ProductionAPI, Serializer: prefixes.ProductionAPI,
@ -111,9 +107,6 @@ func (o *IterOptions) WithRawValue(rawValue bool) *IterOptions {
} }
func (o *IterOptions) WithDB(db *ReadOnlyDBColumnFamily) *IterOptions { func (o *IterOptions) WithDB(db *ReadOnlyDBColumnFamily) *IterOptions {
o.DB = db
// o.Grp = stop.NewDebug(db.Grp)
// iterKey := fmt.Sprintf("%p", o)
// o.Grp.AddNamed(1, iterKey) // o.Grp.AddNamed(1, iterKey)
o.Grp = stop.New(db.Grp) o.Grp = stop.New(db.Grp)
o.Grp.Add(1) o.Grp.Add(1)

View file

@ -189,6 +189,7 @@ func TestHeaders(t *testing.T) {
} }
func TestHeadersSubscribe(t *testing.T) { func TestHeadersSubscribe(t *testing.T) {
args := MakeDefaultTestArgs()
secondaryPath := "asdf" secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer() defer toDefer()
@ -197,7 +198,7 @@ func TestHeadersSubscribe(t *testing.T) {
return return
} }
sm := newSessionManager(db, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout) sm := newSessionManager(db, args, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout)
sm.start() sm.start()
defer sm.stop() defer sm.stop()
@ -367,6 +368,7 @@ func TestListUnspent(t *testing.T) {
} }
func TestAddressSubscribe(t *testing.T) { func TestAddressSubscribe(t *testing.T) {
args := MakeDefaultTestArgs()
secondaryPath := "asdf" secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath) db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer() defer toDefer()
@ -375,7 +377,7 @@ func TestAddressSubscribe(t *testing.T) {
return return
} }
sm := newSessionManager(db, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout) sm := newSessionManager(db, args, &chaincfg.RegressionNetParams, DefaultMaxSessions, DefaultSessionTimeout)
sm.start() sm.start()
defer sm.stop() defer sm.stop()

View file

@ -4,6 +4,10 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
type ServerService struct {
Args *Args
}
type ServerFeatureService struct { type ServerFeatureService struct {
Args *Args Args *Args
} }
@ -26,7 +30,7 @@ type ServerFeaturesRes struct {
} }
// Features is the json rpc endpoint for 'server.features'. // Features is the json rpc endpoint for 'server.features'.
func (t *ServerFeatureService) Features(req *ServerFeaturesReq, res **ServerFeaturesRes) error { func (t *ServerService) Features(req *ServerFeaturesReq, res **ServerFeaturesRes) error {
log.Println("Features") log.Println("Features")
features := &ServerFeaturesRes{ features := &ServerFeaturesRes{
@ -57,7 +61,7 @@ type ServerBannerReq struct{}
type ServerBannerRes string type ServerBannerRes string
// Banner is the json rpc endpoint for 'server.banner'. // Banner is the json rpc endpoint for 'server.banner'.
func (t *ServerBannerService) Banner(req *ServerBannerReq, res **ServerBannerRes) error { func (t *ServerService) Banner(req *ServerBannerReq, res **ServerBannerRes) error {
log.Println("Banner") log.Println("Banner")
*res = (*ServerBannerRes)(t.Args.Banner) *res = (*ServerBannerRes)(t.Args.Banner)
@ -76,7 +80,7 @@ type ServerVersionRes string
// Banner is the json rpc endpoint for 'server.version'. // Banner is the json rpc endpoint for 'server.version'.
// FIXME: This should return a struct with the version and the protocol version. // FIXME: This should return a struct with the version and the protocol version.
// <<-- that comment was written by github, scary shit because it's true // <<-- that comment was written by github, scary shit because it's true
func (t *ServerVersionService) Version(req *ServerVersionReq, res **ServerVersionRes) error { func (t *ServerService) Version(req *ServerVersionReq, res **ServerVersionRes) error {
log.Println("Version") log.Println("Version")
*res = (*ServerVersionRes)(&t.Args.ServerVersion) *res = (*ServerVersionRes)(&t.Args.ServerVersion)

View file

@ -122,26 +122,32 @@ fail1:
} }
// Register "server.{features,banner,version}" handlers. // Register "server.{features,banner,version}" handlers.
serverFeatureSvc := &ServerFeatureService{s.Args} serverSvc := &ServerService{s.Args}
err = s1.RegisterTCPService(serverFeatureSvc, "server_features") err = s1.RegisterTCPService(serverSvc, "server")
if err != nil { if err != nil {
log.Errorf("RegisterTCPService: %v\n", err) log.Errorf("RegisterTCPService: %v\n", err)
goto fail2 goto fail2
} }
// serverFeatureSvc := &ServerFeatureService{s.Args}
// err = s1.RegisterTCPService(serverFeatureSvc, "server_features")
// if err != nil {
// log.Errorf("RegisterTCPService: %v\n", err)
// goto fail2
// }
serverBannerSvc := &ServerBannerService{s.Args} // serverBannerSvc := &ServerBannerService{s.Args}
err = s1.RegisterTCPService(serverBannerSvc, "server_banner") // err = s1.RegisterTCPService(serverBannerSvc, "server_banner")
if err != nil { // if err != nil {
log.Errorf("RegisterTCPService: %v\n", err) // log.Errorf("RegisterTCPService: %v\n", err)
goto fail2 // goto fail2
} // }
serverVersionSvc := &ServerVersionService{s.Args} // serverVersionSvc := &ServerVersionService{s.Args}
err = s1.RegisterTCPService(serverVersionSvc, "server_version") // err = s1.RegisterTCPService(serverVersionSvc, "server_version")
if err != nil { // if err != nil {
log.Errorf("RegisterTCPService: %v\n", err) // log.Errorf("RegisterTCPService: %v\n", err)
goto fail2 // goto fail2
} // }
r := gorilla_mux.NewRouter() r := gorilla_mux.NewRouter()
r.Handle("/rpc", s1) r.Handle("/rpc", s1)

View file

@ -274,7 +274,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
if err != nil { if err != nil {
logrus.Warning(err) logrus.Warning(err)
} }
myDB.Grp = stop.NewDebug(grp) myDB.Grp = stop.New(grp)
} }
// Determine which chain to use based on db and cli values // Determine which chain to use based on db and cli values
@ -340,7 +340,7 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
HeightSubsMut: sync.RWMutex{}, HeightSubsMut: sync.RWMutex{},
NotifierChan: make(chan interface{}), NotifierChan: make(chan interface{}),
Grp: grp, Grp: grp,
sessionManager: newSessionManager(myDB, &chain, args.MaxSessions, args.SessionTimeout), sessionManager: newSessionManager(myDB, args, &chain, args.MaxSessions, args.SessionTimeout),
} }
// Start up our background services // Start up our background services

View file

@ -121,6 +121,7 @@ type sessionManager struct {
sessionTimeout time.Duration sessionTimeout time.Duration
manageTicker *time.Ticker manageTicker *time.Ticker
db *db.ReadOnlyDBColumnFamily db *db.ReadOnlyDBColumnFamily
args *Args
chain *chaincfg.Params chain *chaincfg.Params
// headerSubs are sessions subscribed via 'blockchain.headers.subscribe' // headerSubs are sessions subscribed via 'blockchain.headers.subscribe'
headerSubs sessionMap headerSubs sessionMap
@ -128,13 +129,14 @@ type sessionManager struct {
hashXSubs map[[HASHX_LEN]byte]sessionMap hashXSubs map[[HASHX_LEN]byte]sessionMap
} }
func newSessionManager(db *db.ReadOnlyDBColumnFamily, chain *chaincfg.Params, sessionsMax, sessionTimeout int) *sessionManager { func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, chain *chaincfg.Params, sessionsMax, sessionTimeout int) *sessionManager {
return &sessionManager{ return &sessionManager{
sessions: make(sessionMap), sessions: make(sessionMap),
sessionsMax: sessionsMax, sessionsMax: sessionsMax,
sessionTimeout: time.Duration(sessionTimeout) * time.Second, sessionTimeout: time.Duration(sessionTimeout) * time.Second,
manageTicker: time.NewTicker(time.Duration(max(5, sessionTimeout/20)) * time.Second), manageTicker: time.NewTicker(time.Duration(max(5, sessionTimeout/20)) * time.Second),
db: db, db: db,
args: args,
chain: chain, chain: chain,
headerSubs: make(sessionMap), headerSubs: make(sessionMap),
hashXSubs: make(map[[HASHX_LEN]byte]sessionMap), hashXSubs: make(map[[HASHX_LEN]byte]sessionMap),
@ -190,9 +192,16 @@ func (sm *sessionManager) addSession(conn net.Conn) *session {
// each request and update subscriptions. // each request and update subscriptions.
s1 := rpc.NewServer() s1 := rpc.NewServer()
// Register "server.{features,banner,version}" handlers.
serverSvc := &ServerService{sm.args}
err := s1.RegisterName("server", serverSvc)
if err != nil {
log.Errorf("RegisterTCPService: %v\n", err)
}
// Register "blockchain.claimtrie.*"" handlers. // Register "blockchain.claimtrie.*"" handlers.
claimtrieSvc := &ClaimtrieService{sm.db} claimtrieSvc := &ClaimtrieService{sm.db}
err := s1.RegisterName("blockchain.claimtrie", claimtrieSvc) err = s1.RegisterName("blockchain.claimtrie", claimtrieSvc)
if err != nil { if err != nil {
log.Errorf("RegisterService: %v\n", err) log.Errorf("RegisterService: %v\n", err)
} }