Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
package server
|
|
|
|
|
|
|
|
import (
|
2022-10-29 17:42:24 +02:00
|
|
|
"bytes"
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
"encoding/hex"
|
2022-10-29 17:42:24 +02:00
|
|
|
"encoding/json"
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
"fmt"
|
|
|
|
"net"
|
|
|
|
"net/rpc"
|
|
|
|
"net/rpc/jsonrpc"
|
|
|
|
"strings"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
"unsafe"
|
|
|
|
|
|
|
|
"github.com/lbryio/herald.go/db"
|
|
|
|
"github.com/lbryio/herald.go/internal"
|
|
|
|
"github.com/lbryio/lbcd/chaincfg"
|
2022-10-20 20:05:16 +02:00
|
|
|
"github.com/lbryio/lbcd/chaincfg/chainhash"
|
2022-10-25 07:48:13 +02:00
|
|
|
"github.com/lbryio/lbry.go/v3/extras/stop"
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
)
|
|
|
|
|
|
|
|
type headerNotification struct {
|
|
|
|
internal.HeightHash
|
|
|
|
blockHeader [HEADER_SIZE]byte
|
|
|
|
blockHeaderElectrum *BlockHeaderElectrum
|
|
|
|
blockHeaderStr string
|
|
|
|
}
|
|
|
|
|
|
|
|
type hashXNotification struct {
|
|
|
|
hashX [HASHX_LEN]byte
|
|
|
|
status []byte
|
|
|
|
statusStr string
|
|
|
|
}
|
|
|
|
|
|
|
|
type session struct {
|
|
|
|
id uintptr
|
|
|
|
addr net.Addr
|
|
|
|
conn net.Conn
|
|
|
|
// hashXSubs maps hashX to the original subscription key (address or scripthash)
|
|
|
|
hashXSubs map[[HASHX_LEN]byte]string
|
|
|
|
// headersSub indicates header subscription
|
|
|
|
headersSub bool
|
|
|
|
// headersSubRaw indicates the header subscription mode
|
|
|
|
headersSubRaw bool
|
|
|
|
// client provides the ability to send notifications
|
|
|
|
client rpc.ClientCodec
|
|
|
|
clientSeq uint64
|
|
|
|
// lastRecv records time of last incoming data
|
|
|
|
lastRecv time.Time
|
|
|
|
// lastSend records time of last outgoing data
|
|
|
|
lastSend time.Time
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *session) doNotify(notification interface{}) {
|
|
|
|
var method string
|
|
|
|
var params interface{}
|
|
|
|
switch notification.(type) {
|
|
|
|
case headerNotification:
|
|
|
|
if !s.headersSub {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
note, _ := notification.(headerNotification)
|
|
|
|
heightHash := note.HeightHash
|
|
|
|
method = "blockchain.headers.subscribe"
|
|
|
|
if s.headersSubRaw {
|
|
|
|
header := note.blockHeaderStr
|
|
|
|
if len(header) == 0 {
|
|
|
|
header = hex.EncodeToString(note.blockHeader[:])
|
|
|
|
}
|
|
|
|
params = &HeadersSubscribeRawResp{
|
|
|
|
Hex: header,
|
|
|
|
Height: uint32(heightHash.Height),
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
header := note.blockHeaderElectrum
|
|
|
|
if header == nil { // not initialized
|
|
|
|
header = newBlockHeaderElectrum(¬e.blockHeader, uint32(heightHash.Height))
|
|
|
|
}
|
|
|
|
params = header
|
|
|
|
}
|
|
|
|
case hashXNotification:
|
|
|
|
note, _ := notification.(hashXNotification)
|
|
|
|
orig, ok := s.hashXSubs[note.hashX]
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if len(orig) == 64 {
|
|
|
|
method = "blockchain.scripthash.subscribe"
|
|
|
|
} else {
|
|
|
|
method = "blockchain.address.subscribe"
|
|
|
|
}
|
|
|
|
status := note.statusStr
|
|
|
|
if len(status) == 0 {
|
|
|
|
status = hex.EncodeToString(note.status)
|
|
|
|
}
|
|
|
|
params = []string{orig, status}
|
|
|
|
default:
|
|
|
|
log.Warnf("unknown notification type: %v", notification)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
// Send the notification.
|
|
|
|
s.clientSeq += 1
|
|
|
|
req := &rpc.Request{
|
|
|
|
ServiceMethod: method,
|
|
|
|
Seq: s.clientSeq,
|
|
|
|
}
|
|
|
|
err := s.client.WriteRequest(req, params)
|
|
|
|
if err != nil {
|
|
|
|
log.Warnf("error: %v", err)
|
|
|
|
}
|
|
|
|
// Bump last send time.
|
|
|
|
s.lastSend = time.Now()
|
|
|
|
}
|
|
|
|
|
|
|
|
type sessionMap map[uintptr]*session
|
|
|
|
|
|
|
|
type sessionManager struct {
|
|
|
|
// sessionsMut protects sessions, headerSubs, hashXSubs state
|
2022-10-25 07:48:13 +02:00
|
|
|
sessionsMut sync.RWMutex
|
|
|
|
sessions sessionMap
|
|
|
|
// sessionsWait sync.WaitGroup
|
|
|
|
grp *stop.Group
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
sessionsMax int
|
|
|
|
sessionTimeout time.Duration
|
|
|
|
manageTicker *time.Ticker
|
|
|
|
db *db.ReadOnlyDBColumnFamily
|
2022-10-25 07:48:13 +02:00
|
|
|
args *Args
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
chain *chaincfg.Params
|
|
|
|
// headerSubs are sessions subscribed via 'blockchain.headers.subscribe'
|
|
|
|
headerSubs sessionMap
|
|
|
|
// hashXSubs are sessions subscribed via 'blockchain.{address,scripthash}.subscribe'
|
|
|
|
hashXSubs map[[HASHX_LEN]byte]sessionMap
|
|
|
|
}
|
|
|
|
|
2022-10-25 07:48:13 +02:00
|
|
|
func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
return &sessionManager{
|
|
|
|
sessions: make(sessionMap),
|
2022-10-25 07:48:13 +02:00
|
|
|
grp: grp,
|
|
|
|
sessionsMax: args.MaxSessions,
|
|
|
|
sessionTimeout: time.Duration(args.SessionTimeout) * time.Second,
|
|
|
|
manageTicker: time.NewTicker(time.Duration(max(5, args.SessionTimeout/20)) * time.Second),
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
db: db,
|
2022-10-25 07:48:13 +02:00
|
|
|
args: args,
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
chain: chain,
|
|
|
|
headerSubs: make(sessionMap),
|
|
|
|
hashXSubs: make(map[[HASHX_LEN]byte]sessionMap),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (sm *sessionManager) start() {
|
2022-10-25 07:48:13 +02:00
|
|
|
sm.grp.Add(1)
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
go sm.manage()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (sm *sessionManager) stop() {
|
|
|
|
sm.sessionsMut.Lock()
|
|
|
|
defer sm.sessionsMut.Unlock()
|
|
|
|
sm.headerSubs = make(sessionMap)
|
|
|
|
sm.hashXSubs = make(map[[HASHX_LEN]byte]sessionMap)
|
|
|
|
for _, sess := range sm.sessions {
|
|
|
|
sess.client.Close()
|
|
|
|
sess.conn.Close()
|
|
|
|
}
|
|
|
|
sm.sessions = make(sessionMap)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (sm *sessionManager) manage() {
|
|
|
|
for {
|
|
|
|
sm.sessionsMut.Lock()
|
|
|
|
for _, sess := range sm.sessions {
|
|
|
|
if time.Since(sess.lastRecv) > sm.sessionTimeout {
|
|
|
|
sm.removeSessionLocked(sess)
|
|
|
|
log.Infof("session %v timed out", sess.addr.String())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
sm.sessionsMut.Unlock()
|
|
|
|
// Wait for next management clock tick.
|
2022-10-25 07:48:13 +02:00
|
|
|
select {
|
|
|
|
case <-sm.grp.Ch():
|
|
|
|
sm.grp.Done()
|
|
|
|
return
|
|
|
|
case <-sm.manageTicker.C:
|
|
|
|
continue
|
|
|
|
}
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (sm *sessionManager) addSession(conn net.Conn) *session {
|
|
|
|
sm.sessionsMut.Lock()
|
|
|
|
sess := &session{
|
|
|
|
addr: conn.RemoteAddr(),
|
|
|
|
conn: conn,
|
|
|
|
hashXSubs: make(map[[11]byte]string),
|
|
|
|
client: jsonrpc.NewClientCodec(conn),
|
|
|
|
lastRecv: time.Now(),
|
|
|
|
}
|
|
|
|
sess.id = uintptr(unsafe.Pointer(sess))
|
|
|
|
sm.sessions[sess.id] = sess
|
|
|
|
sm.sessionsMut.Unlock()
|
|
|
|
|
|
|
|
// Create a new RPC server. These services are linked to the
|
|
|
|
// session, which allows RPC handlers to know the session for
|
|
|
|
// each request and update subscriptions.
|
|
|
|
s1 := rpc.NewServer()
|
|
|
|
|
2022-10-25 07:48:13 +02:00
|
|
|
// Register "server.{features,banner,version}" handlers.
|
|
|
|
serverSvc := &ServerService{sm.args}
|
|
|
|
err := s1.RegisterName("server", serverSvc)
|
|
|
|
if err != nil {
|
|
|
|
log.Errorf("RegisterName: %v\n", err)
|
|
|
|
}
|
|
|
|
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
// Register "blockchain.claimtrie.*"" handlers.
|
|
|
|
claimtrieSvc := &ClaimtrieService{sm.db}
|
2022-10-25 07:48:13 +02:00
|
|
|
err = s1.RegisterName("blockchain.claimtrie", claimtrieSvc)
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
if err != nil {
|
2022-10-25 07:48:13 +02:00
|
|
|
log.Errorf("RegisterName: %v\n", err)
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
}
|
|
|
|
|
2022-10-20 20:28:09 +02:00
|
|
|
// Register "blockchain.{block,address,scripthash,transaction}.*" handlers.
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
blockchainSvc := &BlockchainBlockService{sm.db, sm.chain}
|
|
|
|
err = s1.RegisterName("blockchain.block", blockchainSvc)
|
|
|
|
if err != nil {
|
|
|
|
log.Errorf("RegisterName: %v\n", err)
|
|
|
|
goto fail
|
|
|
|
}
|
|
|
|
err = s1.RegisterName("blockchain.headers", &BlockchainHeadersService{sm.db, sm.chain, sm, sess})
|
|
|
|
if err != nil {
|
|
|
|
log.Errorf("RegisterName: %v\n", err)
|
|
|
|
goto fail
|
|
|
|
}
|
|
|
|
err = s1.RegisterName("blockchain.address", &BlockchainAddressService{sm.db, sm.chain, sm, sess})
|
|
|
|
if err != nil {
|
|
|
|
log.Errorf("RegisterName: %v\n", err)
|
|
|
|
goto fail
|
|
|
|
}
|
|
|
|
err = s1.RegisterName("blockchain.scripthash", &BlockchainScripthashService{sm.db, sm.chain, sm, sess})
|
|
|
|
if err != nil {
|
|
|
|
log.Errorf("RegisterName: %v\n", err)
|
|
|
|
goto fail
|
|
|
|
}
|
2022-10-20 20:28:09 +02:00
|
|
|
err = s1.RegisterName("blockchain.transaction", &BlockchainTransactionService{sm.db, sm.chain, sm})
|
|
|
|
if err != nil {
|
|
|
|
log.Errorf("RegisterName: %v\n", err)
|
|
|
|
goto fail
|
|
|
|
}
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
|
2022-10-25 07:48:13 +02:00
|
|
|
sm.grp.Add(1)
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
go func() {
|
2022-10-29 17:42:24 +02:00
|
|
|
s1.ServeCodec(&sessionServerCodec{jsonrpc.NewServerCodec(newJsonPatchingCodec(conn)), sess})
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
log.Infof("session %v goroutine exit", sess.addr.String())
|
2022-10-25 07:48:13 +02:00
|
|
|
sm.grp.Done()
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
}()
|
|
|
|
return sess
|
|
|
|
|
|
|
|
fail:
|
|
|
|
sm.removeSession(sess)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (sm *sessionManager) removeSession(sess *session) {
|
|
|
|
sm.sessionsMut.Lock()
|
|
|
|
defer sm.sessionsMut.Unlock()
|
|
|
|
sm.removeSessionLocked(sess)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (sm *sessionManager) removeSessionLocked(sess *session) {
|
|
|
|
if sess.headersSub {
|
|
|
|
delete(sm.headerSubs, sess.id)
|
|
|
|
}
|
|
|
|
for hashX := range sess.hashXSubs {
|
|
|
|
subs, ok := sm.hashXSubs[hashX]
|
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
delete(subs, sess.id)
|
|
|
|
}
|
|
|
|
delete(sm.sessions, sess.id)
|
|
|
|
sess.client.Close()
|
|
|
|
sess.conn.Close()
|
|
|
|
}
|
|
|
|
|
2022-10-20 20:05:16 +02:00
|
|
|
func (sm *sessionManager) broadcastTx(rawTx []byte) (*chainhash.Hash, error) {
|
|
|
|
// TODO
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
func (sm *sessionManager) headersSubscribe(sess *session, raw bool, subscribe bool) {
|
|
|
|
sm.sessionsMut.Lock()
|
|
|
|
defer sm.sessionsMut.Unlock()
|
|
|
|
if subscribe {
|
|
|
|
sm.headerSubs[sess.id] = sess
|
|
|
|
sess.headersSub = true
|
|
|
|
sess.headersSubRaw = raw
|
|
|
|
return
|
|
|
|
}
|
|
|
|
delete(sm.headerSubs, sess.id)
|
|
|
|
sess.headersSub = false
|
|
|
|
sess.headersSubRaw = false
|
|
|
|
}
|
|
|
|
|
|
|
|
func (sm *sessionManager) hashXSubscribe(sess *session, hashX []byte, original string, subscribe bool) {
|
|
|
|
sm.sessionsMut.Lock()
|
|
|
|
defer sm.sessionsMut.Unlock()
|
|
|
|
var key [HASHX_LEN]byte
|
|
|
|
copy(key[:], hashX)
|
|
|
|
subs, ok := sm.hashXSubs[key]
|
|
|
|
if subscribe {
|
|
|
|
if !ok {
|
|
|
|
subs = make(sessionMap)
|
|
|
|
sm.hashXSubs[key] = subs
|
|
|
|
}
|
|
|
|
subs[sess.id] = sess
|
|
|
|
sess.hashXSubs[key] = original
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if ok {
|
|
|
|
delete(subs, sess.id)
|
|
|
|
if len(subs) == 0 {
|
|
|
|
delete(sm.hashXSubs, key)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
delete(sess.hashXSubs, key)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (sm *sessionManager) doNotify(notification interface{}) {
|
|
|
|
sm.sessionsMut.RLock()
|
|
|
|
var subsCopy sessionMap
|
|
|
|
switch notification.(type) {
|
|
|
|
case headerNotification:
|
|
|
|
note, _ := notification.(headerNotification)
|
|
|
|
subsCopy = sm.headerSubs
|
|
|
|
if len(subsCopy) > 0 {
|
|
|
|
note.blockHeaderElectrum = newBlockHeaderElectrum(¬e.blockHeader, uint32(note.Height))
|
|
|
|
note.blockHeaderStr = hex.EncodeToString(note.blockHeader[:])
|
|
|
|
}
|
|
|
|
case hashXNotification:
|
|
|
|
note, _ := notification.(hashXNotification)
|
|
|
|
hashXSubs, ok := sm.hashXSubs[note.hashX]
|
|
|
|
if ok {
|
|
|
|
subsCopy = hashXSubs
|
|
|
|
}
|
|
|
|
if len(subsCopy) > 0 {
|
|
|
|
note.statusStr = hex.EncodeToString(note.status)
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
log.Warnf("unknown notification type: %v", notification)
|
|
|
|
}
|
|
|
|
sm.sessionsMut.RUnlock()
|
|
|
|
|
|
|
|
// Deliver notification to relevant sessions.
|
|
|
|
for _, sess := range subsCopy {
|
|
|
|
sess.doNotify(notification)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-10-29 17:42:24 +02:00
|
|
|
type sessionServerCodec struct {
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
rpc.ServerCodec
|
|
|
|
sess *session
|
|
|
|
}
|
|
|
|
|
|
|
|
// ReadRequestHeader provides ability to rewrite the incoming
|
|
|
|
// request "method" field. For example:
|
|
|
|
// blockchain.block.get_header -> blockchain.block.Get_header
|
|
|
|
// blockchain.address.listunspent -> blockchain.address.Listunspent
|
|
|
|
// This makes the "method" string compatible with rpc.Server
|
|
|
|
// requirements.
|
2022-10-29 17:42:24 +02:00
|
|
|
func (c *sessionServerCodec) ReadRequestHeader(req *rpc.Request) error {
|
|
|
|
log.Infof("from %v receive header", c.sess.addr.String())
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
err := c.ServerCodec.ReadRequestHeader(req)
|
|
|
|
if err != nil {
|
|
|
|
log.Warnf("error: %v", err)
|
|
|
|
return err
|
|
|
|
}
|
2022-10-29 17:42:24 +02:00
|
|
|
log.Infof("from %v receive header: %#v", c.sess.addr.String(), *req)
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
rawMethod := req.ServiceMethod
|
|
|
|
parts := strings.Split(rawMethod, ".")
|
|
|
|
if len(parts) < 2 {
|
|
|
|
return fmt.Errorf("blockchain rpc: service/method ill-formed: %q", rawMethod)
|
|
|
|
}
|
|
|
|
service := strings.Join(parts[0:len(parts)-1], ".")
|
|
|
|
method := parts[len(parts)-1]
|
|
|
|
if len(method) < 1 {
|
|
|
|
return fmt.Errorf("blockchain rpc: method ill-formed: %q", method)
|
|
|
|
}
|
|
|
|
method = strings.ToUpper(string(method[0])) + string(method[1:])
|
|
|
|
req.ServiceMethod = service + "." + method
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// ReadRequestBody wraps the regular implementation, but updates session stats too.
|
2022-10-29 17:42:24 +02:00
|
|
|
func (c *sessionServerCodec) ReadRequestBody(params any) error {
|
|
|
|
log.Infof("from %v receive body", c.sess.addr.String())
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
err := c.ServerCodec.ReadRequestBody(params)
|
|
|
|
if err != nil {
|
|
|
|
log.Warnf("error: %v", err)
|
|
|
|
return err
|
|
|
|
}
|
2022-10-29 17:42:24 +02:00
|
|
|
log.Infof("from %v receive body: %#v", c.sess.addr.String(), params)
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
// Bump last receive time.
|
|
|
|
c.sess.lastRecv = time.Now()
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// WriteResponse wraps the regular implementation, but updates session stats too.
|
2022-10-29 17:42:24 +02:00
|
|
|
func (c *sessionServerCodec) WriteResponse(resp *rpc.Response, reply any) error {
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
log.Infof("respond to %v", c.sess.addr.String())
|
|
|
|
err := c.ServerCodec.WriteResponse(resp, reply)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// Bump last send time.
|
|
|
|
c.sess.lastSend = time.Now()
|
|
|
|
return err
|
|
|
|
}
|
2022-10-29 17:42:24 +02:00
|
|
|
|
|
|
|
// serverRequest is a duplicate of serverRequest from
|
|
|
|
// net/rpc/jsonrpc/server.go with an added Version which
|
|
|
|
// we can check.
|
|
|
|
type serverRequest struct {
|
|
|
|
Version string `json:"jsonrpc"`
|
|
|
|
Method string `json:"method"`
|
|
|
|
Params *json.RawMessage `json:"params"`
|
|
|
|
Id *json.RawMessage `json:"id"`
|
|
|
|
}
|
|
|
|
|
|
|
|
// serverResponse is a duplicate of serverResponse from
|
|
|
|
// net/rpc/jsonrpc/server.go with an added Version which
|
|
|
|
// we can set at will.
|
|
|
|
type serverResponse struct {
|
|
|
|
Version string `json:"jsonrpc"`
|
|
|
|
Id *json.RawMessage `json:"id"`
|
|
|
|
Result any `json:"result,omitempty"`
|
|
|
|
Error any `json:"error,omitempty"`
|
|
|
|
}
|
|
|
|
|
|
|
|
// jsonPatchingCodec is able to intercept the JSON requests/responses
|
|
|
|
// and tweak them. Currently, it appears we need to make several changes:
|
|
|
|
// 1) add "jsonrpc": "2.0" (or "jsonrpc": "1.0") in response
|
|
|
|
// 2) add newline to frame response
|
|
|
|
// 3) add "params": [] when "params" is missing
|
|
|
|
// 4) replace params ["arg1", "arg2", ...] with [["arg1", "arg2", ...]]
|
|
|
|
type jsonPatchingCodec struct {
|
|
|
|
conn net.Conn
|
|
|
|
inBuffer *bytes.Buffer
|
|
|
|
dec *json.Decoder
|
|
|
|
enc *json.Encoder
|
|
|
|
outBuffer *bytes.Buffer
|
|
|
|
}
|
|
|
|
|
|
|
|
func newJsonPatchingCodec(conn net.Conn) *jsonPatchingCodec {
|
|
|
|
buf1, buf2 := bytes.NewBuffer(nil), bytes.NewBuffer(nil)
|
|
|
|
return &jsonPatchingCodec{
|
|
|
|
conn: conn,
|
|
|
|
inBuffer: buf1,
|
|
|
|
dec: json.NewDecoder(buf1),
|
|
|
|
enc: json.NewEncoder(buf2),
|
|
|
|
outBuffer: buf2,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *jsonPatchingCodec) Read(p []byte) (n int, err error) {
|
|
|
|
if c.outBuffer.Len() > 0 {
|
|
|
|
// Return remaining decoded bytes.
|
|
|
|
return c.outBuffer.Read(p)
|
|
|
|
}
|
|
|
|
// Buffer contents consumed. Try to decode more JSON.
|
|
|
|
|
|
|
|
// Read until framing newline. This allows us to print the raw request.
|
|
|
|
for !bytes.ContainsAny(c.inBuffer.Bytes(), "\n") {
|
|
|
|
var buf [1024]byte
|
|
|
|
n, err = c.conn.Read(buf[:])
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
c.inBuffer.Write(buf[:n])
|
|
|
|
}
|
|
|
|
log.Infof("raw request: %v", c.inBuffer.String())
|
|
|
|
|
|
|
|
var req serverRequest
|
|
|
|
err = c.dec.Decode(&req)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if req.Params != nil {
|
|
|
|
n := len(*req.Params)
|
|
|
|
if n < 2 || (*req.Params)[0] != '[' && (*req.Params)[n-1] != ']' {
|
|
|
|
// This is an error, but we're not going to try to correct it.
|
|
|
|
goto encode
|
|
|
|
}
|
|
|
|
// FIXME: The heuristics here don't cover all possibilities.
|
|
|
|
// For example: [{obj1}, {obj2}] or ["foo,bar"] would not
|
|
|
|
// be handled correctly.
|
|
|
|
bracketed := (*req.Params)[1 : n-1]
|
|
|
|
n = len(bracketed)
|
|
|
|
if n > 1 && (bracketed[0] == '{' || bracketed[0] == '[') {
|
|
|
|
// Probable single object or list argument.
|
|
|
|
goto encode
|
|
|
|
}
|
|
|
|
args := strings.Split(string(bracketed), ",")
|
|
|
|
if len(args) <= 1 {
|
|
|
|
// No commas at all. Definitely a single argument.
|
|
|
|
goto encode
|
|
|
|
}
|
|
|
|
// The params look like ["arg1", "arg2", "arg3", ...].
|
|
|
|
// We're in trouble because our jsonrpc library does not
|
|
|
|
// handle this. So pack these args in an inner list.
|
|
|
|
// The handler method will receive ONE list argument.
|
|
|
|
params := json.RawMessage(fmt.Sprintf("[[%s]]", bracketed))
|
|
|
|
req.Params = ¶ms
|
|
|
|
} else {
|
|
|
|
// Add empty argument list if params omitted.
|
|
|
|
params := json.RawMessage("[]")
|
|
|
|
req.Params = ¶ms
|
|
|
|
}
|
|
|
|
|
|
|
|
encode:
|
|
|
|
// Encode the request. This allows us to print the patched request.
|
|
|
|
buf, err := json.Marshal(req)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
log.Infof("patched request: %v", string(buf))
|
|
|
|
|
|
|
|
err = c.enc.Encode(req)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
return c.outBuffer.Read(p)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *jsonPatchingCodec) Write(p []byte) (n int, err error) {
|
|
|
|
log.Infof("raw response: %v", string(p))
|
|
|
|
var resp serverResponse
|
|
|
|
err = json.Unmarshal(p, &resp)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Add "jsonrpc": "2.0" if missing.
|
|
|
|
if len(resp.Version) == 0 {
|
|
|
|
resp.Version = "2.0"
|
|
|
|
}
|
|
|
|
|
|
|
|
buf, err := json.Marshal(resp)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
log.Infof("patched response: %v", string(buf))
|
|
|
|
|
|
|
|
// Add newline for framing.
|
|
|
|
return c.conn.Write(append(buf, '\n'))
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *jsonPatchingCodec) Close() error {
|
|
|
|
return c.conn.Close()
|
|
|
|
}
|