From 3ddcbbb55dbb4095595f9eb4ea7996b08e11c900 Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Thu, 29 Sep 2022 12:10:06 -0500 Subject: [PATCH] Changes to make session.go testable. Conn created with Pipe() used in testing has no unique Addr. --- server/session.go | 39 ++++++++++++++++----------------------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/server/session.go b/server/session.go index 9c8f9ea..0ae1b11 100644 --- a/server/session.go +++ b/server/session.go @@ -9,12 +9,11 @@ import ( "strings" "sync" "time" + "unsafe" "github.com/lbryio/herald.go/db" "github.com/lbryio/herald.go/internal" "github.com/lbryio/lbcd/chaincfg" - "github.com/lbryio/lbcd/txscript" - "github.com/lbryio/lbcutil" log "github.com/sirupsen/logrus" ) @@ -32,6 +31,7 @@ type hashXNotification struct { } type session struct { + id uintptr addr net.Addr conn net.Conn // hashXSubs maps hashX to the original subscription key (address or scripthash) @@ -71,7 +71,7 @@ func (s *session) doNotify(notification interface{}) { } } else { header := note.blockHeaderElectrum - if len(header.PrevBlockHash) == 0 { // not initialized + if header == nil { // not initialized header = newBlockHeaderElectrum(¬e.blockHeader, uint32(heightHash.Height)) } params = header @@ -110,7 +110,7 @@ func (s *session) doNotify(notification interface{}) { s.lastSend = time.Now() } -type sessionMap map[net.Addr]*session +type sessionMap map[uintptr]*session type sessionManager struct { // sessionsMut protects sessions, headerSubs, hashXSubs state @@ -165,20 +165,11 @@ func (sm *sessionManager) manage() { } sm.sessionsMut.Unlock() - // TEMPORARY TESTING: Send fake notification for specific address. - address, _ := lbcutil.DecodeAddress("bNe63fYgYNA85ZQ56p7MwBtuCL7MXPRfrm", sm.chain) - script, _ := txscript.PayToAddrScript(address) - hashX := hashXScript(script, sm.chain) - note := hashXNotification{} - copy(note.hashX[:], hashX) - note.status = append(note.status, []byte("fake status bytes")...) - sm.doNotify(note) - dur, _ := time.ParseDuration("10s") time.AfterFunc(dur, func() { sm.manage() }) } -func (sm *sessionManager) addSession(conn net.Conn) { +func (sm *sessionManager) addSession(conn net.Conn) *session { sm.sessionsMut.Lock() sess := &session{ addr: conn.RemoteAddr(), @@ -187,7 +178,8 @@ func (sm *sessionManager) addSession(conn net.Conn) { client: jsonrpc.NewClientCodec(conn), lastRecv: time.Now(), } - sm.sessions[sess.addr] = sess + sess.id = uintptr(unsafe.Pointer(sess)) + sm.sessions[sess.id] = sess sm.sessionsMut.Unlock() // Create a new RPC server. These services are linked to the @@ -231,10 +223,11 @@ func (sm *sessionManager) addSession(conn net.Conn) { log.Infof("session %v goroutine exit", sess.addr.String()) sm.sessionsWait.Done() }() - return + return sess fail: sm.removeSession(sess) + return nil } func (sm *sessionManager) removeSession(sess *session) { @@ -245,16 +238,16 @@ func (sm *sessionManager) removeSession(sess *session) { func (sm *sessionManager) removeSessionLocked(sess *session) { if sess.headersSub { - delete(sm.headerSubs, sess.addr) + delete(sm.headerSubs, sess.id) } for hashX := range sess.hashXSubs { subs, ok := sm.hashXSubs[hashX] if !ok { continue } - delete(subs, sess.addr) + delete(subs, sess.id) } - delete(sm.sessions, sess.addr) + delete(sm.sessions, sess.id) sess.client.Close() sess.conn.Close() } @@ -263,12 +256,12 @@ func (sm *sessionManager) headersSubscribe(sess *session, raw bool, subscribe bo sm.sessionsMut.Lock() defer sm.sessionsMut.Unlock() if subscribe { - sm.headerSubs[sess.addr] = sess + sm.headerSubs[sess.id] = sess sess.headersSub = true sess.headersSubRaw = raw return } - delete(sm.headerSubs, sess.addr) + delete(sm.headerSubs, sess.id) sess.headersSub = false sess.headersSubRaw = false } @@ -284,12 +277,12 @@ func (sm *sessionManager) hashXSubscribe(sess *session, hashX []byte, original s subs = make(sessionMap) sm.hashXSubs[key] = subs } - subs[sess.addr] = sess + subs[sess.id] = sess sess.hashXSubs[key] = original return } if ok { - delete(subs, sess.addr) + delete(subs, sess.id) if len(subs) == 0 { delete(sm.hashXSubs, key) }