Changes to make session.go testable. Conn created with Pipe()

used in testing has no unique Addr.
This commit is contained in:
Jonathan Moody 2022-09-29 12:10:06 -05:00
parent 813fd4590a
commit 3ddcbbb55d

View file

@ -9,12 +9,11 @@ import (
"strings" "strings"
"sync" "sync"
"time" "time"
"unsafe"
"github.com/lbryio/herald.go/db" "github.com/lbryio/herald.go/db"
"github.com/lbryio/herald.go/internal" "github.com/lbryio/herald.go/internal"
"github.com/lbryio/lbcd/chaincfg" "github.com/lbryio/lbcd/chaincfg"
"github.com/lbryio/lbcd/txscript"
"github.com/lbryio/lbcutil"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -32,6 +31,7 @@ type hashXNotification struct {
} }
type session struct { type session struct {
id uintptr
addr net.Addr addr net.Addr
conn net.Conn conn net.Conn
// hashXSubs maps hashX to the original subscription key (address or scripthash) // hashXSubs maps hashX to the original subscription key (address or scripthash)
@ -71,7 +71,7 @@ func (s *session) doNotify(notification interface{}) {
} }
} else { } else {
header := note.blockHeaderElectrum header := note.blockHeaderElectrum
if len(header.PrevBlockHash) == 0 { // not initialized if header == nil { // not initialized
header = newBlockHeaderElectrum(&note.blockHeader, uint32(heightHash.Height)) header = newBlockHeaderElectrum(&note.blockHeader, uint32(heightHash.Height))
} }
params = header params = header
@ -110,7 +110,7 @@ func (s *session) doNotify(notification interface{}) {
s.lastSend = time.Now() s.lastSend = time.Now()
} }
type sessionMap map[net.Addr]*session type sessionMap map[uintptr]*session
type sessionManager struct { type sessionManager struct {
// sessionsMut protects sessions, headerSubs, hashXSubs state // sessionsMut protects sessions, headerSubs, hashXSubs state
@ -165,20 +165,11 @@ func (sm *sessionManager) manage() {
} }
sm.sessionsMut.Unlock() sm.sessionsMut.Unlock()
// TEMPORARY TESTING: Send fake notification for specific address.
address, _ := lbcutil.DecodeAddress("bNe63fYgYNA85ZQ56p7MwBtuCL7MXPRfrm", sm.chain)
script, _ := txscript.PayToAddrScript(address)
hashX := hashXScript(script, sm.chain)
note := hashXNotification{}
copy(note.hashX[:], hashX)
note.status = append(note.status, []byte("fake status bytes")...)
sm.doNotify(note)
dur, _ := time.ParseDuration("10s") dur, _ := time.ParseDuration("10s")
time.AfterFunc(dur, func() { sm.manage() }) time.AfterFunc(dur, func() { sm.manage() })
} }
func (sm *sessionManager) addSession(conn net.Conn) { func (sm *sessionManager) addSession(conn net.Conn) *session {
sm.sessionsMut.Lock() sm.sessionsMut.Lock()
sess := &session{ sess := &session{
addr: conn.RemoteAddr(), addr: conn.RemoteAddr(),
@ -187,7 +178,8 @@ func (sm *sessionManager) addSession(conn net.Conn) {
client: jsonrpc.NewClientCodec(conn), client: jsonrpc.NewClientCodec(conn),
lastRecv: time.Now(), lastRecv: time.Now(),
} }
sm.sessions[sess.addr] = sess sess.id = uintptr(unsafe.Pointer(sess))
sm.sessions[sess.id] = sess
sm.sessionsMut.Unlock() sm.sessionsMut.Unlock()
// Create a new RPC server. These services are linked to the // Create a new RPC server. These services are linked to the
@ -231,10 +223,11 @@ func (sm *sessionManager) addSession(conn net.Conn) {
log.Infof("session %v goroutine exit", sess.addr.String()) log.Infof("session %v goroutine exit", sess.addr.String())
sm.sessionsWait.Done() sm.sessionsWait.Done()
}() }()
return return sess
fail: fail:
sm.removeSession(sess) sm.removeSession(sess)
return nil
} }
func (sm *sessionManager) removeSession(sess *session) { func (sm *sessionManager) removeSession(sess *session) {
@ -245,16 +238,16 @@ func (sm *sessionManager) removeSession(sess *session) {
func (sm *sessionManager) removeSessionLocked(sess *session) { func (sm *sessionManager) removeSessionLocked(sess *session) {
if sess.headersSub { if sess.headersSub {
delete(sm.headerSubs, sess.addr) delete(sm.headerSubs, sess.id)
} }
for hashX := range sess.hashXSubs { for hashX := range sess.hashXSubs {
subs, ok := sm.hashXSubs[hashX] subs, ok := sm.hashXSubs[hashX]
if !ok { if !ok {
continue continue
} }
delete(subs, sess.addr) delete(subs, sess.id)
} }
delete(sm.sessions, sess.addr) delete(sm.sessions, sess.id)
sess.client.Close() sess.client.Close()
sess.conn.Close() sess.conn.Close()
} }
@ -263,12 +256,12 @@ func (sm *sessionManager) headersSubscribe(sess *session, raw bool, subscribe bo
sm.sessionsMut.Lock() sm.sessionsMut.Lock()
defer sm.sessionsMut.Unlock() defer sm.sessionsMut.Unlock()
if subscribe { if subscribe {
sm.headerSubs[sess.addr] = sess sm.headerSubs[sess.id] = sess
sess.headersSub = true sess.headersSub = true
sess.headersSubRaw = raw sess.headersSubRaw = raw
return return
} }
delete(sm.headerSubs, sess.addr) delete(sm.headerSubs, sess.id)
sess.headersSub = false sess.headersSub = false
sess.headersSubRaw = false sess.headersSubRaw = false
} }
@ -284,12 +277,12 @@ func (sm *sessionManager) hashXSubscribe(sess *session, hashX []byte, original s
subs = make(sessionMap) subs = make(sessionMap)
sm.hashXSubs[key] = subs sm.hashXSubs[key] = subs
} }
subs[sess.addr] = sess subs[sess.id] = sess
sess.hashXSubs[key] = original sess.hashXSubs[key] = original
return return
} }
if ok { if ok {
delete(subs, sess.addr) delete(subs, sess.id)
if len(subs) == 0 { if len(subs) == 0 {
delete(sm.hashXSubs, key) delete(sm.hashXSubs, key)
} }