Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
This commit is contained in:
parent
b9cb9d8c5a
commit
7f47de2949
3 changed files with 514 additions and 2 deletions
|
@ -26,16 +26,31 @@ type BlockchainBlockService struct {
|
|||
Chain *chaincfg.Params
|
||||
}
|
||||
|
||||
// BlockchainBlockService methods handle "blockchain.headers.*" RPCs
|
||||
type BlockchainHeadersService struct {
|
||||
DB *db.ReadOnlyDBColumnFamily
|
||||
Chain *chaincfg.Params
|
||||
// needed for subscribe/unsubscribe
|
||||
sessionMgr *sessionManager
|
||||
session *session
|
||||
}
|
||||
|
||||
// BlockchainAddressService methods handle "blockchain.address.*" RPCs
|
||||
type BlockchainAddressService struct {
|
||||
DB *db.ReadOnlyDBColumnFamily
|
||||
Chain *chaincfg.Params
|
||||
// needed for subscribe/unsubscribe
|
||||
sessionMgr *sessionManager
|
||||
session *session
|
||||
}
|
||||
|
||||
// BlockchainScripthashService methods handle "blockchain.scripthash.*" RPCs
|
||||
type BlockchainScripthashService struct {
|
||||
DB *db.ReadOnlyDBColumnFamily
|
||||
Chain *chaincfg.Params
|
||||
// needed for subscribe/unsubscribe
|
||||
sessionMgr *sessionManager
|
||||
session *session
|
||||
}
|
||||
|
||||
const CHUNK_SIZE = 96
|
||||
|
@ -177,6 +192,47 @@ func (s *BlockchainBlockService) Headers(req *BlockHeadersReq, resp **BlockHeade
|
|||
return err
|
||||
}
|
||||
|
||||
type HeadersSubscribeReq struct {
|
||||
Raw bool `json:"raw"`
|
||||
}
|
||||
|
||||
type HeadersSubscribeResp struct {
|
||||
BlockHeaderElectrum
|
||||
}
|
||||
type HeadersSubscribeRawResp struct {
|
||||
Hex string `json:"hex"`
|
||||
Height uint32 `json:"height"`
|
||||
}
|
||||
|
||||
// 'blockchain.headers.subscribe'
|
||||
func (s *BlockchainHeadersService) Subscribe(req *HeadersSubscribeReq, resp *interface{}) error {
|
||||
if s.sessionMgr == nil || s.session == nil {
|
||||
return errors.New("no session, rpc not supported")
|
||||
}
|
||||
s.sessionMgr.headersSubscribe(s.session, req.Raw, true /*subscribe*/)
|
||||
height := s.DB.Height
|
||||
if s.DB.LastState != nil {
|
||||
height = s.DB.LastState.Height
|
||||
}
|
||||
headers, err := s.DB.GetHeaders(height, 1)
|
||||
if err != nil {
|
||||
s.sessionMgr.headersSubscribe(s.session, req.Raw, false /*subscribe*/)
|
||||
return err
|
||||
}
|
||||
if len(headers) < 1 {
|
||||
return errors.New("not found")
|
||||
}
|
||||
if req.Raw {
|
||||
*resp = &HeadersSubscribeRawResp{
|
||||
Hex: hex.EncodeToString(headers[0][:]),
|
||||
Height: height,
|
||||
}
|
||||
} else {
|
||||
*resp = &HeadersSubscribeResp{*newBlockHeaderElectrum(&headers[0], height)}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func decodeScriptHash(scripthash string) ([]byte, error) {
|
||||
sh, err := hex.DecodeString(scripthash)
|
||||
if err != nil {
|
||||
|
@ -449,3 +505,94 @@ func (s *BlockchainScripthashService) Listunspent(req *ScripthashListUnspentReq,
|
|||
*resp = &result
|
||||
return err
|
||||
}
|
||||
|
||||
type AddressSubscribeReq []string
|
||||
type AddressSubscribeResp []string
|
||||
|
||||
// 'blockchain.address.subscribe'
|
||||
func (s *BlockchainAddressService) Subscribe(req *AddressSubscribeReq, resp **AddressSubscribeResp) error {
|
||||
if s.sessionMgr == nil || s.session == nil {
|
||||
return errors.New("no session, rpc not supported")
|
||||
}
|
||||
result := make([]string, 0, len(*req))
|
||||
for _, addr := range *req {
|
||||
address, err := lbcutil.DecodeAddress(addr, s.Chain)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
script, err := txscript.PayToAddrScript(address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
hashX := hashXScript(script, s.Chain)
|
||||
s.sessionMgr.hashXSubscribe(s.session, hashX, addr, true /*subscribe*/)
|
||||
status, err := s.DB.GetStatus(hashX)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
result = append(result, hex.EncodeToString(status))
|
||||
}
|
||||
*resp = (*AddressSubscribeResp)(&result)
|
||||
return nil
|
||||
}
|
||||
|
||||
// 'blockchain.address.unsubscribe'
|
||||
func (s *BlockchainAddressService) Unsubscribe(req *AddressSubscribeReq, resp **AddressSubscribeResp) error {
|
||||
if s.sessionMgr == nil || s.session == nil {
|
||||
return errors.New("no session, rpc not supported")
|
||||
}
|
||||
for _, addr := range *req {
|
||||
address, err := lbcutil.DecodeAddress(addr, s.Chain)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
script, err := txscript.PayToAddrScript(address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
hashX := hashXScript(script, s.Chain)
|
||||
s.sessionMgr.hashXSubscribe(s.session, hashX, addr, false /*subscribe*/)
|
||||
}
|
||||
*resp = (*AddressSubscribeResp)(nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
type ScripthashSubscribeReq string
|
||||
type ScripthashSubscribeResp string
|
||||
|
||||
// 'blockchain.scripthash.subscribe'
|
||||
func (s *BlockchainScripthashService) Subscribe(req *ScripthashSubscribeReq, resp **ScripthashSubscribeResp) error {
|
||||
if s.sessionMgr == nil || s.session == nil {
|
||||
return errors.New("no session, rpc not supported")
|
||||
}
|
||||
var result string
|
||||
scripthash, err := decodeScriptHash(string(*req))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
hashX := hashX(scripthash)
|
||||
s.sessionMgr.hashXSubscribe(s.session, hashX, string(*req), true /*subscribe*/)
|
||||
|
||||
status, err := s.DB.GetStatus(hashX)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
result = hex.EncodeToString(status)
|
||||
*resp = (*ScripthashSubscribeResp)(&result)
|
||||
return nil
|
||||
}
|
||||
|
||||
// 'blockchain.scripthash.unsubscribe'
|
||||
func (s *BlockchainScripthashService) Unsubscribe(req *ScripthashSubscribeReq, resp **ScripthashSubscribeResp) error {
|
||||
if s.sessionMgr == nil || s.session == nil {
|
||||
return errors.New("no session, rpc not supported")
|
||||
}
|
||||
scripthash, err := decodeScriptHash(string(*req))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
hashX := hashX(scripthash)
|
||||
s.sessionMgr.hashXSubscribe(s.session, hashX, string(*req), false /*subscribe*/)
|
||||
*resp = (*ScripthashSubscribeResp)(nil)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -67,11 +67,11 @@ func (s *Server) StartJsonRPC() error {
|
|||
if err != nil {
|
||||
log.Errorf("RegisterService: %v\n", err)
|
||||
}
|
||||
err = s1.RegisterTCPService(&BlockchainAddressService{s.DB, s.Chain}, "blockchain_address")
|
||||
err = s1.RegisterTCPService(&BlockchainAddressService{s.DB, s.Chain, nil, nil}, "blockchain_address")
|
||||
if err != nil {
|
||||
log.Errorf("RegisterService: %v\n", err)
|
||||
}
|
||||
err = s1.RegisterTCPService(&BlockchainScripthashService{s.DB, s.Chain}, "blockchain_scripthash")
|
||||
err = s1.RegisterTCPService(&BlockchainScripthashService{s.DB, s.Chain, nil, nil}, "blockchain_scripthash")
|
||||
if err != nil {
|
||||
log.Errorf("RegisterService: %v\n", err)
|
||||
}
|
||||
|
|
365
server/session.go
Normal file
365
server/session.go
Normal file
|
@ -0,0 +1,365 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/rpc"
|
||||
"net/rpc/jsonrpc"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/herald.go/db"
|
||||
"github.com/lbryio/herald.go/internal"
|
||||
"github.com/lbryio/lbcd/chaincfg"
|
||||
"github.com/lbryio/lbcd/txscript"
|
||||
"github.com/lbryio/lbcutil"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var SESSION_INACTIVE_TIMEOUT = 2 * time.Minute
|
||||
|
||||
type headerNotification struct {
|
||||
internal.HeightHash
|
||||
blockHeader [HEADER_SIZE]byte
|
||||
blockHeaderElectrum *BlockHeaderElectrum
|
||||
blockHeaderStr string
|
||||
}
|
||||
|
||||
type hashXNotification struct {
|
||||
hashX [HASHX_LEN]byte
|
||||
status []byte
|
||||
statusStr string
|
||||
}
|
||||
|
||||
type session struct {
|
||||
addr net.Addr
|
||||
conn net.Conn
|
||||
// hashXSubs maps hashX to the original subscription key (address or scripthash)
|
||||
hashXSubs map[[HASHX_LEN]byte]string
|
||||
// headersSub indicates header subscription
|
||||
headersSub bool
|
||||
// headersSubRaw indicates the header subscription mode
|
||||
headersSubRaw bool
|
||||
// client provides the ability to send notifications
|
||||
client rpc.ClientCodec
|
||||
clientSeq uint64
|
||||
// lastRecv records time of last incoming data
|
||||
lastRecv time.Time
|
||||
// lastSend records time of last outgoing data
|
||||
lastSend time.Time
|
||||
}
|
||||
|
||||
func (s *session) doNotify(notification interface{}) {
|
||||
var method string
|
||||
var params interface{}
|
||||
switch notification.(type) {
|
||||
case headerNotification:
|
||||
if !s.headersSub {
|
||||
return
|
||||
}
|
||||
note, _ := notification.(headerNotification)
|
||||
heightHash := note.HeightHash
|
||||
method = "blockchain.headers.subscribe"
|
||||
if s.headersSubRaw {
|
||||
header := note.blockHeaderStr
|
||||
if len(header) == 0 {
|
||||
header = hex.EncodeToString(note.blockHeader[:])
|
||||
}
|
||||
params = &HeadersSubscribeRawResp{
|
||||
Hex: header,
|
||||
Height: uint32(heightHash.Height),
|
||||
}
|
||||
} else {
|
||||
header := note.blockHeaderElectrum
|
||||
if len(header.PrevBlockHash) == 0 { // not initialized
|
||||
header = newBlockHeaderElectrum(¬e.blockHeader, uint32(heightHash.Height))
|
||||
}
|
||||
params = header
|
||||
}
|
||||
case hashXNotification:
|
||||
note, _ := notification.(hashXNotification)
|
||||
orig, ok := s.hashXSubs[note.hashX]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if len(orig) == 64 {
|
||||
method = "blockchain.scripthash.subscribe"
|
||||
} else {
|
||||
method = "blockchain.address.subscribe"
|
||||
}
|
||||
status := note.statusStr
|
||||
if len(status) == 0 {
|
||||
status = hex.EncodeToString(note.status)
|
||||
}
|
||||
params = []string{orig, status}
|
||||
default:
|
||||
log.Warnf("unknown notification type: %v", notification)
|
||||
return
|
||||
}
|
||||
// Send the notification.
|
||||
s.clientSeq += 1
|
||||
req := &rpc.Request{
|
||||
ServiceMethod: method,
|
||||
Seq: s.clientSeq,
|
||||
}
|
||||
err := s.client.WriteRequest(req, params)
|
||||
if err != nil {
|
||||
log.Warnf("error: %v", err)
|
||||
}
|
||||
// Bump last send time.
|
||||
s.lastSend = time.Now()
|
||||
}
|
||||
|
||||
type sessionMap map[net.Addr]*session
|
||||
|
||||
type sessionManager struct {
|
||||
// sessionsMut protects sessions, headerSubs, hashXSubs state
|
||||
sessionsMut sync.RWMutex
|
||||
sessions sessionMap
|
||||
db *db.ReadOnlyDBColumnFamily
|
||||
chain *chaincfg.Params
|
||||
// headerSubs are sessions subscribed via 'blockchain.headers.subscribe'
|
||||
headerSubs sessionMap
|
||||
// hashXSubs are sessions subscribed via 'blockchain.{address,scripthash}.subscribe'
|
||||
hashXSubs map[[HASHX_LEN]byte]sessionMap
|
||||
}
|
||||
|
||||
func newSessionManager(db *db.ReadOnlyDBColumnFamily, chain *chaincfg.Params) *sessionManager {
|
||||
return &sessionManager{
|
||||
sessions: make(sessionMap),
|
||||
db: db,
|
||||
chain: chain,
|
||||
headerSubs: make(sessionMap),
|
||||
hashXSubs: make(map[[HASHX_LEN]byte]sessionMap),
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *sessionManager) start() {
|
||||
go sm.manage()
|
||||
}
|
||||
|
||||
func (sm *sessionManager) stop() {
|
||||
sm.sessionsMut.Lock()
|
||||
defer sm.sessionsMut.Unlock()
|
||||
sm.headerSubs = make(sessionMap)
|
||||
sm.hashXSubs = make(map[[HASHX_LEN]byte]sessionMap)
|
||||
for _, sess := range sm.sessions {
|
||||
sess.client.Close()
|
||||
sess.conn.Close()
|
||||
}
|
||||
sm.sessions = make(sessionMap)
|
||||
}
|
||||
|
||||
func (sm *sessionManager) manage() {
|
||||
for _, sess := range sm.sessions {
|
||||
if time.Since(sess.lastRecv) > SESSION_INACTIVE_TIMEOUT {
|
||||
sm.removeSession(sess)
|
||||
log.Infof("session %v timed out", sess.addr.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TEMPORARY TESTING: Send fake notification for specific address.
|
||||
address, _ := lbcutil.DecodeAddress("bNe63fYgYNA85ZQ56p7MwBtuCL7MXPRfrm", sm.chain)
|
||||
script, _ := txscript.PayToAddrScript(address)
|
||||
hashX := hashXScript(script, sm.chain)
|
||||
note := hashXNotification{}
|
||||
copy(note.hashX[:], hashX)
|
||||
note.status = append(note.status, []byte("fake status bytes")...)
|
||||
sm.doNotify(note)
|
||||
|
||||
dur, _ := time.ParseDuration("10s")
|
||||
time.AfterFunc(dur, func() { sm.manage() })
|
||||
}
|
||||
|
||||
func (sm *sessionManager) addSession(conn net.Conn) {
|
||||
sm.sessionsMut.Lock()
|
||||
sess := &session{
|
||||
addr: conn.RemoteAddr(),
|
||||
conn: conn,
|
||||
hashXSubs: make(map[[11]byte]string),
|
||||
client: jsonrpc.NewClientCodec(conn),
|
||||
lastRecv: time.Now(),
|
||||
}
|
||||
sm.sessions[sess.addr] = sess
|
||||
sm.sessionsMut.Unlock()
|
||||
|
||||
// Create a new RPC server. These services are linked to the
|
||||
// session, which allows RPC handlers to know the session for
|
||||
// each request and update subscriptions.
|
||||
s1 := rpc.NewServer()
|
||||
|
||||
// Register "blockchain.claimtrie.*"" handlers.
|
||||
claimtrieSvc := &ClaimtrieService{sm.db}
|
||||
err := s1.RegisterName("blockchain.claimtrie", claimtrieSvc)
|
||||
if err != nil {
|
||||
log.Errorf("RegisterService: %v\n", err)
|
||||
}
|
||||
|
||||
// Register other "blockchain.{block,address,scripthash}.*" handlers.
|
||||
blockchainSvc := &BlockchainBlockService{sm.db, sm.chain}
|
||||
err = s1.RegisterName("blockchain.block", blockchainSvc)
|
||||
if err != nil {
|
||||
log.Errorf("RegisterService: %v\n", err)
|
||||
}
|
||||
err = s1.RegisterName("blockchain.headers", &BlockchainHeadersService{sm.db, sm.chain, sm, sess})
|
||||
if err != nil {
|
||||
log.Errorf("RegisterService: %v\n", err)
|
||||
}
|
||||
err = s1.RegisterName("blockchain.address", &BlockchainAddressService{sm.db, sm.chain, sm, sess})
|
||||
if err != nil {
|
||||
log.Errorf("RegisterService: %v\n", err)
|
||||
}
|
||||
err = s1.RegisterName("blockchain.scripthash", &BlockchainScripthashService{sm.db, sm.chain, sm, sess})
|
||||
if err != nil {
|
||||
log.Errorf("RegisterService: %v\n", err)
|
||||
}
|
||||
|
||||
go s1.ServeCodec(&SessionServerCodec{jsonrpc.NewServerCodec(conn), sess})
|
||||
}
|
||||
|
||||
func (sm *sessionManager) removeSession(sess *session) {
|
||||
sm.sessionsMut.Lock()
|
||||
defer sm.sessionsMut.Unlock()
|
||||
if sess.headersSub {
|
||||
delete(sm.headerSubs, sess.addr)
|
||||
}
|
||||
for hashX := range sess.hashXSubs {
|
||||
subs, ok := sm.hashXSubs[hashX]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
delete(subs, sess.addr)
|
||||
}
|
||||
delete(sm.sessions, sess.addr)
|
||||
sess.client.Close()
|
||||
sess.conn.Close()
|
||||
}
|
||||
|
||||
func (sm *sessionManager) headersSubscribe(sess *session, raw bool, subscribe bool) {
|
||||
sm.sessionsMut.Lock()
|
||||
defer sm.sessionsMut.Unlock()
|
||||
if subscribe {
|
||||
sm.headerSubs[sess.addr] = sess
|
||||
sess.headersSub = true
|
||||
sess.headersSubRaw = raw
|
||||
return
|
||||
}
|
||||
delete(sm.headerSubs, sess.addr)
|
||||
sess.headersSub = false
|
||||
sess.headersSubRaw = false
|
||||
}
|
||||
|
||||
func (sm *sessionManager) hashXSubscribe(sess *session, hashX []byte, original string, subscribe bool) {
|
||||
sm.sessionsMut.Lock()
|
||||
defer sm.sessionsMut.Unlock()
|
||||
var key [HASHX_LEN]byte
|
||||
copy(key[:], hashX)
|
||||
subs, ok := sm.hashXSubs[key]
|
||||
if subscribe {
|
||||
if !ok {
|
||||
subs = make(sessionMap)
|
||||
sm.hashXSubs[key] = subs
|
||||
}
|
||||
subs[sess.addr] = sess
|
||||
sess.hashXSubs[key] = original
|
||||
return
|
||||
}
|
||||
if ok {
|
||||
delete(subs, sess.addr)
|
||||
if len(subs) == 0 {
|
||||
delete(sm.hashXSubs, key)
|
||||
}
|
||||
}
|
||||
delete(sess.hashXSubs, key)
|
||||
}
|
||||
|
||||
func (sm *sessionManager) doNotify(notification interface{}) {
|
||||
sm.sessionsMut.RLock()
|
||||
var subsCopy sessionMap
|
||||
switch notification.(type) {
|
||||
case headerNotification:
|
||||
note, _ := notification.(headerNotification)
|
||||
subsCopy = sm.headerSubs
|
||||
if len(subsCopy) > 0 {
|
||||
note.blockHeaderElectrum = newBlockHeaderElectrum(¬e.blockHeader, uint32(note.Height))
|
||||
note.blockHeaderStr = hex.EncodeToString(note.blockHeader[:])
|
||||
}
|
||||
case hashXNotification:
|
||||
note, _ := notification.(hashXNotification)
|
||||
hashXSubs, ok := sm.hashXSubs[note.hashX]
|
||||
if ok {
|
||||
subsCopy = hashXSubs
|
||||
}
|
||||
if len(subsCopy) > 0 {
|
||||
note.statusStr = hex.EncodeToString(note.status)
|
||||
}
|
||||
default:
|
||||
log.Warnf("unknown notification type: %v", notification)
|
||||
}
|
||||
sm.sessionsMut.RUnlock()
|
||||
|
||||
// Deliver notification to relevant sessions.
|
||||
for _, sess := range subsCopy {
|
||||
sess.doNotify(notification)
|
||||
}
|
||||
}
|
||||
|
||||
type SessionServerCodec struct {
|
||||
rpc.ServerCodec
|
||||
sess *session
|
||||
}
|
||||
|
||||
// ReadRequestHeader provides ability to rewrite the incoming
|
||||
// request "method" field. For example:
|
||||
// blockchain.block.get_header -> blockchain.block.Get_header
|
||||
// blockchain.address.listunspent -> blockchain.address.Listunspent
|
||||
// This makes the "method" string compatible with rpc.Server
|
||||
// requirements.
|
||||
func (c *SessionServerCodec) ReadRequestHeader(req *rpc.Request) error {
|
||||
log.Infof("receive header from %v", c.sess.addr.String())
|
||||
err := c.ServerCodec.ReadRequestHeader(req)
|
||||
if err != nil {
|
||||
log.Warnf("error: %v", err)
|
||||
return err
|
||||
}
|
||||
rawMethod := req.ServiceMethod
|
||||
parts := strings.Split(rawMethod, ".")
|
||||
if len(parts) < 2 {
|
||||
return fmt.Errorf("blockchain rpc: service/method ill-formed: %q", rawMethod)
|
||||
}
|
||||
service := strings.Join(parts[0:len(parts)-1], ".")
|
||||
method := parts[len(parts)-1]
|
||||
if len(method) < 1 {
|
||||
return fmt.Errorf("blockchain rpc: method ill-formed: %q", method)
|
||||
}
|
||||
method = strings.ToUpper(string(method[0])) + string(method[1:])
|
||||
req.ServiceMethod = service + "." + method
|
||||
return err
|
||||
}
|
||||
|
||||
// ReadRequestBody wraps the regular implementation, but updates session stats too.
|
||||
func (c *SessionServerCodec) ReadRequestBody(params any) error {
|
||||
err := c.ServerCodec.ReadRequestBody(params)
|
||||
if err != nil {
|
||||
log.Warnf("error: %v", err)
|
||||
return err
|
||||
}
|
||||
log.Infof("receive body from %v", c.sess.addr.String())
|
||||
// Bump last receive time.
|
||||
c.sess.lastRecv = time.Now()
|
||||
return err
|
||||
}
|
||||
|
||||
// WriteResponse wraps the regular implementation, but updates session stats too.
|
||||
func (c *SessionServerCodec) WriteResponse(resp *rpc.Response, reply any) error {
|
||||
log.Infof("respond to %v", c.sess.addr.String())
|
||||
err := c.ServerCodec.WriteResponse(resp, reply)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Bump last send time.
|
||||
c.sess.lastSend = time.Now()
|
||||
return err
|
||||
}
|
Loading…
Reference in a new issue