Use time.Ticker object to drive management activity.

This commit is contained in:
Jonathan Moody 2022-10-03 14:37:00 -04:00
parent 1eb645a0b9
commit 5f068341e3
2 changed files with 19 additions and 9 deletions

View file

@ -65,6 +65,13 @@ func min[Ord constraints.Ordered](x, y Ord) Ord {
return y return y
} }
func max[Ord constraints.Ordered](x, y Ord) Ord {
if x > y {
return x
}
return y
}
type BlockHeaderElectrum struct { type BlockHeaderElectrum struct {
Version uint32 `json:"version"` Version uint32 `json:"version"`
PrevBlockHash string `json:"prev_block_hash"` PrevBlockHash string `json:"prev_block_hash"`

View file

@ -119,6 +119,7 @@ type sessionManager struct {
sessionsWait sync.WaitGroup sessionsWait sync.WaitGroup
sessionsMax int sessionsMax int
sessionTimeout time.Duration sessionTimeout time.Duration
manageTicker *time.Ticker
db *db.ReadOnlyDBColumnFamily db *db.ReadOnlyDBColumnFamily
chain *chaincfg.Params chain *chaincfg.Params
// headerSubs are sessions subscribed via 'blockchain.headers.subscribe' // headerSubs are sessions subscribed via 'blockchain.headers.subscribe'
@ -132,6 +133,7 @@ func newSessionManager(db *db.ReadOnlyDBColumnFamily, chain *chaincfg.Params, se
sessions: make(sessionMap), sessions: make(sessionMap),
sessionsMax: sessionsMax, sessionsMax: sessionsMax,
sessionTimeout: time.Duration(sessionTimeout) * time.Second, sessionTimeout: time.Duration(sessionTimeout) * time.Second,
manageTicker: time.NewTicker(time.Duration(max(5, sessionTimeout/20)) * time.Second),
db: db, db: db,
chain: chain, chain: chain,
headerSubs: make(sessionMap), headerSubs: make(sessionMap),
@ -156,17 +158,18 @@ func (sm *sessionManager) stop() {
} }
func (sm *sessionManager) manage() { func (sm *sessionManager) manage() {
sm.sessionsMut.Lock() for {
for _, sess := range sm.sessions { sm.sessionsMut.Lock()
if time.Since(sess.lastRecv) > sm.sessionTimeout { for _, sess := range sm.sessions {
sm.removeSessionLocked(sess) if time.Since(sess.lastRecv) > sm.sessionTimeout {
log.Infof("session %v timed out", sess.addr.String()) sm.removeSessionLocked(sess)
log.Infof("session %v timed out", sess.addr.String())
}
} }
sm.sessionsMut.Unlock()
// Wait for next management clock tick.
<-sm.manageTicker.C
} }
sm.sessionsMut.Unlock()
dur, _ := time.ParseDuration("10s")
time.AfterFunc(dur, func() { sm.manage() })
} }
func (sm *sessionManager) addSession(conn net.Conn) *session { func (sm *sessionManager) addSession(conn net.Conn) *session {