Add --max-sessions, --session-timeout args. Enforce max sessions.

This commit is contained in:
Jonathan Moody 2022-09-28 14:49:32 -05:00
parent e56edf0c9a
commit 813fd4590a
4 changed files with 41 additions and 18 deletions

View file

@ -30,6 +30,8 @@ type Args struct {
NotifierPort string NotifierPort string
JSONRPCPort int JSONRPCPort int
JSONRPCHTTPPort int JSONRPCHTTPPort int
MaxSessions int
SessionTimeout int
EsIndex string EsIndex string
RefreshDelta int RefreshDelta int
CacheTTL int CacheTTL int
@ -61,6 +63,8 @@ const (
DefaultPrometheusPort = "2112" DefaultPrometheusPort = "2112"
DefaultNotifierPort = "18080" DefaultNotifierPort = "18080"
DefaultJSONRPCPort = 50001 DefaultJSONRPCPort = 50001
DefaultMaxSessions = 10000
DefaultSessionTimeout = 300
DefaultRefreshDelta = 5 DefaultRefreshDelta = 5
DefaultCacheTTL = 5 DefaultCacheTTL = 5
DefaultPeerFile = "peers.txt" DefaultPeerFile = "peers.txt"
@ -129,6 +133,8 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
notifierPort := parser.String("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort}) notifierPort := parser.String("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort})
jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort}) jsonRPCPort := parser.Int("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Validate: validatePort})
jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort}) jsonRPCHTTPPort := parser.Int("", "json-rpc-http-port", &argparse.Options{Required: false, Help: "JSON RPC over HTTP port", Validate: validatePort})
maxSessions := parser.Int("", "max-sessions", &argparse.Options{Required: false, Help: "Maximum number of electrum clients that can be connected", Default: DefaultMaxSessions})
sessionTimeout := parser.Int("", "session-timeout", &argparse.Options{Required: false, Help: "Session inactivity timeout (seconds)", Default: DefaultSessionTimeout})
esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: DefaultEsIndex}) esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: DefaultEsIndex})
refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: DefaultRefreshDelta}) refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: DefaultRefreshDelta})
cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: DefaultCacheTTL}) cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: DefaultCacheTTL})
@ -183,6 +189,8 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
NotifierPort: *notifierPort, NotifierPort: *notifierPort,
JSONRPCPort: *jsonRPCPort, JSONRPCPort: *jsonRPCPort,
JSONRPCHTTPPort: *jsonRPCHTTPPort, JSONRPCHTTPPort: *jsonRPCHTTPPort,
MaxSessions: *maxSessions,
SessionTimeout: *sessionTimeout,
EsIndex: *esIndex, EsIndex: *esIndex,
RefreshDelta: *refreshDelta, RefreshDelta: *refreshDelta,
CacheTTL: *cacheTTL, CacheTTL: *cacheTTL,

View file

@ -11,6 +11,7 @@ import (
gorilla_rpc "github.com/gorilla/rpc" gorilla_rpc "github.com/gorilla/rpc"
gorilla_json "github.com/gorilla/rpc/json" gorilla_json "github.com/gorilla/rpc/json"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.org/x/net/netutil"
) )
type gorillaRpcCodec struct { type gorillaRpcCodec struct {
@ -79,7 +80,7 @@ func (s *Server) StartJsonRPC() error {
s.sessionManager.addSession(conn) s.sessionManager.addSession(conn)
} }
} }
go acceptConnections(listener) go acceptConnections(netutil.LimitListener(listener, s.sessionManager.sessionsMax))
} }
fail1: fail1:
@ -109,7 +110,7 @@ fail1:
log.Errorf("RegisterTCPService: %v\n", err) log.Errorf("RegisterTCPService: %v\n", err)
goto fail2 goto fail2
} }
err = s1.RegisterTCPService(&BlockchainAddressService{s.DB, s.Chain, nil, nil}, "blockchain_address") err = s1.RegisterTCPService(&BlockchainAddressService{s.DB, s.Chain, nil, nil}, "blockchain_address")
if err != nil { if err != nil {
log.Errorf("RegisterTCPService: %v\n", err) log.Errorf("RegisterTCPService: %v\n", err)
goto fail2 goto fail2

View file

@ -333,7 +333,7 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
HeightSubs: make(map[net.Addr]net.Conn), HeightSubs: make(map[net.Addr]net.Conn),
HeightSubsMut: sync.RWMutex{}, HeightSubsMut: sync.RWMutex{},
NotifierChan: make(chan interface{}), NotifierChan: make(chan interface{}),
sessionManager: newSessionManager(myDB, &chain), sessionManager: newSessionManager(myDB, &chain, args.MaxSessions, args.SessionTimeout),
} }
// Start up our background services // Start up our background services

View file

@ -18,8 +18,6 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
var SESSION_INACTIVE_TIMEOUT = 2 * time.Minute
type headerNotification struct { type headerNotification struct {
internal.HeightHash internal.HeightHash
blockHeader [HEADER_SIZE]byte blockHeader [HEADER_SIZE]byte
@ -116,23 +114,28 @@ type sessionMap map[net.Addr]*session
type sessionManager struct { type sessionManager struct {
// sessionsMut protects sessions, headerSubs, hashXSubs state // sessionsMut protects sessions, headerSubs, hashXSubs state
sessionsMut sync.RWMutex sessionsMut sync.RWMutex
sessions sessionMap sessions sessionMap
db *db.ReadOnlyDBColumnFamily sessionsWait sync.WaitGroup
chain *chaincfg.Params sessionsMax int
sessionTimeout time.Duration
db *db.ReadOnlyDBColumnFamily
chain *chaincfg.Params
// headerSubs are sessions subscribed via 'blockchain.headers.subscribe' // headerSubs are sessions subscribed via 'blockchain.headers.subscribe'
headerSubs sessionMap headerSubs sessionMap
// hashXSubs are sessions subscribed via 'blockchain.{address,scripthash}.subscribe' // hashXSubs are sessions subscribed via 'blockchain.{address,scripthash}.subscribe'
hashXSubs map[[HASHX_LEN]byte]sessionMap hashXSubs map[[HASHX_LEN]byte]sessionMap
} }
func newSessionManager(db *db.ReadOnlyDBColumnFamily, chain *chaincfg.Params) *sessionManager { func newSessionManager(db *db.ReadOnlyDBColumnFamily, chain *chaincfg.Params, sessionsMax, sessionTimeout int) *sessionManager {
return &sessionManager{ return &sessionManager{
sessions: make(sessionMap), sessions: make(sessionMap),
db: db, sessionsMax: sessionsMax,
chain: chain, sessionTimeout: time.Duration(sessionTimeout) * time.Second,
headerSubs: make(sessionMap), db: db,
hashXSubs: make(map[[HASHX_LEN]byte]sessionMap), chain: chain,
headerSubs: make(sessionMap),
hashXSubs: make(map[[HASHX_LEN]byte]sessionMap),
} }
} }
@ -153,12 +156,14 @@ func (sm *sessionManager) stop() {
} }
func (sm *sessionManager) manage() { func (sm *sessionManager) manage() {
sm.sessionsMut.Lock()
for _, sess := range sm.sessions { for _, sess := range sm.sessions {
if time.Since(sess.lastRecv) > SESSION_INACTIVE_TIMEOUT { if time.Since(sess.lastRecv) > sm.sessionTimeout {
sm.removeSession(sess) sm.removeSessionLocked(sess)
log.Infof("session %v timed out", sess.addr.String()) log.Infof("session %v timed out", sess.addr.String())
} }
} }
sm.sessionsMut.Unlock()
// TEMPORARY TESTING: Send fake notification for specific address. // TEMPORARY TESTING: Send fake notification for specific address.
address, _ := lbcutil.DecodeAddress("bNe63fYgYNA85ZQ56p7MwBtuCL7MXPRfrm", sm.chain) address, _ := lbcutil.DecodeAddress("bNe63fYgYNA85ZQ56p7MwBtuCL7MXPRfrm", sm.chain)
@ -220,7 +225,12 @@ func (sm *sessionManager) addSession(conn net.Conn) {
goto fail goto fail
} }
go s1.ServeCodec(&SessionServerCodec{jsonrpc.NewServerCodec(conn), sess}) sm.sessionsWait.Add(1)
go func() {
s1.ServeCodec(&SessionServerCodec{jsonrpc.NewServerCodec(conn), sess})
log.Infof("session %v goroutine exit", sess.addr.String())
sm.sessionsWait.Done()
}()
return return
fail: fail:
@ -230,6 +240,10 @@ fail:
func (sm *sessionManager) removeSession(sess *session) { func (sm *sessionManager) removeSession(sess *session) {
sm.sessionsMut.Lock() sm.sessionsMut.Lock()
defer sm.sessionsMut.Unlock() defer sm.sessionsMut.Unlock()
sm.removeSessionLocked(sess)
}
func (sm *sessionManager) removeSessionLocked(sess *session) {
if sess.headersSub { if sess.headersSub {
delete(sm.headerSubs, sess.addr) delete(sm.headerSubs, sess.addr)
} }