diff --git a/server/jsonrpc_blockchain.go b/server/jsonrpc_blockchain.go index 11cd935..6b0f9ef 100644 --- a/server/jsonrpc_blockchain.go +++ b/server/jsonrpc_blockchain.go @@ -26,16 +26,31 @@ type BlockchainBlockService struct { Chain *chaincfg.Params } +// BlockchainBlockService methods handle "blockchain.headers.*" RPCs +type BlockchainHeadersService struct { + DB *db.ReadOnlyDBColumnFamily + Chain *chaincfg.Params + // needed for subscribe/unsubscribe + sessionMgr *sessionManager + session *session +} + // BlockchainAddressService methods handle "blockchain.address.*" RPCs type BlockchainAddressService struct { DB *db.ReadOnlyDBColumnFamily Chain *chaincfg.Params + // needed for subscribe/unsubscribe + sessionMgr *sessionManager + session *session } // BlockchainScripthashService methods handle "blockchain.scripthash.*" RPCs type BlockchainScripthashService struct { DB *db.ReadOnlyDBColumnFamily Chain *chaincfg.Params + // needed for subscribe/unsubscribe + sessionMgr *sessionManager + session *session } const CHUNK_SIZE = 96 @@ -177,6 +192,47 @@ func (s *BlockchainBlockService) Headers(req *BlockHeadersReq, resp **BlockHeade return err } +type HeadersSubscribeReq struct { + Raw bool `json:"raw"` +} + +type HeadersSubscribeResp struct { + BlockHeaderElectrum +} +type HeadersSubscribeRawResp struct { + Hex string `json:"hex"` + Height uint32 `json:"height"` +} + +// 'blockchain.headers.subscribe' +func (s *BlockchainHeadersService) Subscribe(req *HeadersSubscribeReq, resp *interface{}) error { + if s.sessionMgr == nil || s.session == nil { + return errors.New("no session, rpc not supported") + } + s.sessionMgr.headersSubscribe(s.session, req.Raw, true /*subscribe*/) + height := s.DB.Height + if s.DB.LastState != nil { + height = s.DB.LastState.Height + } + headers, err := s.DB.GetHeaders(height, 1) + if err != nil { + s.sessionMgr.headersSubscribe(s.session, req.Raw, false /*subscribe*/) + return err + } + if len(headers) < 1 { + return errors.New("not found") + } + if req.Raw { + *resp = &HeadersSubscribeRawResp{ + Hex: hex.EncodeToString(headers[0][:]), + Height: height, + } + } else { + *resp = &HeadersSubscribeResp{*newBlockHeaderElectrum(&headers[0], height)} + } + return err +} + func decodeScriptHash(scripthash string) ([]byte, error) { sh, err := hex.DecodeString(scripthash) if err != nil { @@ -449,3 +505,94 @@ func (s *BlockchainScripthashService) Listunspent(req *ScripthashListUnspentReq, *resp = &result return err } + +type AddressSubscribeReq []string +type AddressSubscribeResp []string + +// 'blockchain.address.subscribe' +func (s *BlockchainAddressService) Subscribe(req *AddressSubscribeReq, resp **AddressSubscribeResp) error { + if s.sessionMgr == nil || s.session == nil { + return errors.New("no session, rpc not supported") + } + result := make([]string, 0, len(*req)) + for _, addr := range *req { + address, err := lbcutil.DecodeAddress(addr, s.Chain) + if err != nil { + return err + } + script, err := txscript.PayToAddrScript(address) + if err != nil { + return err + } + hashX := hashXScript(script, s.Chain) + s.sessionMgr.hashXSubscribe(s.session, hashX, addr, true /*subscribe*/) + status, err := s.DB.GetStatus(hashX) + if err != nil { + return err + } + result = append(result, hex.EncodeToString(status)) + } + *resp = (*AddressSubscribeResp)(&result) + return nil +} + +// 'blockchain.address.unsubscribe' +func (s *BlockchainAddressService) Unsubscribe(req *AddressSubscribeReq, resp **AddressSubscribeResp) error { + if s.sessionMgr == nil || s.session == nil { + return errors.New("no session, rpc not supported") + } + for _, addr := range *req { + address, err := lbcutil.DecodeAddress(addr, s.Chain) + if err != nil { + return err + } + script, err := txscript.PayToAddrScript(address) + if err != nil { + return err + } + hashX := hashXScript(script, s.Chain) + s.sessionMgr.hashXSubscribe(s.session, hashX, addr, false /*subscribe*/) + } + *resp = (*AddressSubscribeResp)(nil) + return nil +} + +type ScripthashSubscribeReq string +type ScripthashSubscribeResp string + +// 'blockchain.scripthash.subscribe' +func (s *BlockchainScripthashService) Subscribe(req *ScripthashSubscribeReq, resp **ScripthashSubscribeResp) error { + if s.sessionMgr == nil || s.session == nil { + return errors.New("no session, rpc not supported") + } + var result string + scripthash, err := decodeScriptHash(string(*req)) + if err != nil { + return err + } + hashX := hashX(scripthash) + s.sessionMgr.hashXSubscribe(s.session, hashX, string(*req), true /*subscribe*/) + + status, err := s.DB.GetStatus(hashX) + if err != nil { + return err + } + result = hex.EncodeToString(status) + *resp = (*ScripthashSubscribeResp)(&result) + return nil +} + +// 'blockchain.scripthash.unsubscribe' +func (s *BlockchainScripthashService) Unsubscribe(req *ScripthashSubscribeReq, resp **ScripthashSubscribeResp) error { + if s.sessionMgr == nil || s.session == nil { + return errors.New("no session, rpc not supported") + } + scripthash, err := decodeScriptHash(string(*req)) + if err != nil { + return err + } + hashX := hashX(scripthash) + s.sessionMgr.hashXSubscribe(s.session, hashX, string(*req), false /*subscribe*/) + *resp = (*ScripthashSubscribeResp)(nil) + return nil +} diff --git a/server/jsonrpc_service.go b/server/jsonrpc_service.go index 3df87ce..9d42b9d 100644 --- a/server/jsonrpc_service.go +++ b/server/jsonrpc_service.go @@ -67,11 +67,11 @@ func (s *Server) StartJsonRPC() error { if err != nil { log.Errorf("RegisterService: %v\n", err) } - err = s1.RegisterTCPService(&BlockchainAddressService{s.DB, s.Chain}, "blockchain_address") + err = s1.RegisterTCPService(&BlockchainAddressService{s.DB, s.Chain, nil, nil}, "blockchain_address") if err != nil { log.Errorf("RegisterService: %v\n", err) } - err = s1.RegisterTCPService(&BlockchainScripthashService{s.DB, s.Chain}, "blockchain_scripthash") + err = s1.RegisterTCPService(&BlockchainScripthashService{s.DB, s.Chain, nil, nil}, "blockchain_scripthash") if err != nil { log.Errorf("RegisterService: %v\n", err) } diff --git a/server/session.go b/server/session.go new file mode 100644 index 0000000..4b95ae4 --- /dev/null +++ b/server/session.go @@ -0,0 +1,365 @@ +package server + +import ( + "encoding/hex" + "fmt" + "net" + "net/rpc" + "net/rpc/jsonrpc" + "strings" + "sync" + "time" + + "github.com/lbryio/herald.go/db" + "github.com/lbryio/herald.go/internal" + "github.com/lbryio/lbcd/chaincfg" + "github.com/lbryio/lbcd/txscript" + "github.com/lbryio/lbcutil" + log "github.com/sirupsen/logrus" +) + +var SESSION_INACTIVE_TIMEOUT = 2 * time.Minute + +type headerNotification struct { + internal.HeightHash + blockHeader [HEADER_SIZE]byte + blockHeaderElectrum *BlockHeaderElectrum + blockHeaderStr string +} + +type hashXNotification struct { + hashX [HASHX_LEN]byte + status []byte + statusStr string +} + +type session struct { + addr net.Addr + conn net.Conn + // hashXSubs maps hashX to the original subscription key (address or scripthash) + hashXSubs map[[HASHX_LEN]byte]string + // headersSub indicates header subscription + headersSub bool + // headersSubRaw indicates the header subscription mode + headersSubRaw bool + // client provides the ability to send notifications + client rpc.ClientCodec + clientSeq uint64 + // lastRecv records time of last incoming data + lastRecv time.Time + // lastSend records time of last outgoing data + lastSend time.Time +} + +func (s *session) doNotify(notification interface{}) { + var method string + var params interface{} + switch notification.(type) { + case headerNotification: + if !s.headersSub { + return + } + note, _ := notification.(headerNotification) + heightHash := note.HeightHash + method = "blockchain.headers.subscribe" + if s.headersSubRaw { + header := note.blockHeaderStr + if len(header) == 0 { + header = hex.EncodeToString(note.blockHeader[:]) + } + params = &HeadersSubscribeRawResp{ + Hex: header, + Height: uint32(heightHash.Height), + } + } else { + header := note.blockHeaderElectrum + if len(header.PrevBlockHash) == 0 { // not initialized + header = newBlockHeaderElectrum(¬e.blockHeader, uint32(heightHash.Height)) + } + params = header + } + case hashXNotification: + note, _ := notification.(hashXNotification) + orig, ok := s.hashXSubs[note.hashX] + if !ok { + return + } + if len(orig) == 64 { + method = "blockchain.scripthash.subscribe" + } else { + method = "blockchain.address.subscribe" + } + status := note.statusStr + if len(status) == 0 { + status = hex.EncodeToString(note.status) + } + params = []string{orig, status} + default: + log.Warnf("unknown notification type: %v", notification) + return + } + // Send the notification. + s.clientSeq += 1 + req := &rpc.Request{ + ServiceMethod: method, + Seq: s.clientSeq, + } + err := s.client.WriteRequest(req, params) + if err != nil { + log.Warnf("error: %v", err) + } + // Bump last send time. + s.lastSend = time.Now() +} + +type sessionMap map[net.Addr]*session + +type sessionManager struct { + // sessionsMut protects sessions, headerSubs, hashXSubs state + sessionsMut sync.RWMutex + sessions sessionMap + db *db.ReadOnlyDBColumnFamily + chain *chaincfg.Params + // headerSubs are sessions subscribed via 'blockchain.headers.subscribe' + headerSubs sessionMap + // hashXSubs are sessions subscribed via 'blockchain.{address,scripthash}.subscribe' + hashXSubs map[[HASHX_LEN]byte]sessionMap +} + +func newSessionManager(db *db.ReadOnlyDBColumnFamily, chain *chaincfg.Params) *sessionManager { + return &sessionManager{ + sessions: make(sessionMap), + db: db, + chain: chain, + headerSubs: make(sessionMap), + hashXSubs: make(map[[HASHX_LEN]byte]sessionMap), + } +} + +func (sm *sessionManager) start() { + go sm.manage() +} + +func (sm *sessionManager) stop() { + sm.sessionsMut.Lock() + defer sm.sessionsMut.Unlock() + sm.headerSubs = make(sessionMap) + sm.hashXSubs = make(map[[HASHX_LEN]byte]sessionMap) + for _, sess := range sm.sessions { + sess.client.Close() + sess.conn.Close() + } + sm.sessions = make(sessionMap) +} + +func (sm *sessionManager) manage() { + for _, sess := range sm.sessions { + if time.Since(sess.lastRecv) > SESSION_INACTIVE_TIMEOUT { + sm.removeSession(sess) + log.Infof("session %v timed out", sess.addr.String()) + } + } + + // TEMPORARY TESTING: Send fake notification for specific address. + address, _ := lbcutil.DecodeAddress("bNe63fYgYNA85ZQ56p7MwBtuCL7MXPRfrm", sm.chain) + script, _ := txscript.PayToAddrScript(address) + hashX := hashXScript(script, sm.chain) + note := hashXNotification{} + copy(note.hashX[:], hashX) + note.status = append(note.status, []byte("fake status bytes")...) + sm.doNotify(note) + + dur, _ := time.ParseDuration("10s") + time.AfterFunc(dur, func() { sm.manage() }) +} + +func (sm *sessionManager) addSession(conn net.Conn) { + sm.sessionsMut.Lock() + sess := &session{ + addr: conn.RemoteAddr(), + conn: conn, + hashXSubs: make(map[[11]byte]string), + client: jsonrpc.NewClientCodec(conn), + lastRecv: time.Now(), + } + sm.sessions[sess.addr] = sess + sm.sessionsMut.Unlock() + + // Create a new RPC server. These services are linked to the + // session, which allows RPC handlers to know the session for + // each request and update subscriptions. + s1 := rpc.NewServer() + + // Register "blockchain.claimtrie.*"" handlers. + claimtrieSvc := &ClaimtrieService{sm.db} + err := s1.RegisterName("blockchain.claimtrie", claimtrieSvc) + if err != nil { + log.Errorf("RegisterService: %v\n", err) + } + + // Register other "blockchain.{block,address,scripthash}.*" handlers. + blockchainSvc := &BlockchainBlockService{sm.db, sm.chain} + err = s1.RegisterName("blockchain.block", blockchainSvc) + if err != nil { + log.Errorf("RegisterService: %v\n", err) + } + err = s1.RegisterName("blockchain.headers", &BlockchainHeadersService{sm.db, sm.chain, sm, sess}) + if err != nil { + log.Errorf("RegisterService: %v\n", err) + } + err = s1.RegisterName("blockchain.address", &BlockchainAddressService{sm.db, sm.chain, sm, sess}) + if err != nil { + log.Errorf("RegisterService: %v\n", err) + } + err = s1.RegisterName("blockchain.scripthash", &BlockchainScripthashService{sm.db, sm.chain, sm, sess}) + if err != nil { + log.Errorf("RegisterService: %v\n", err) + } + + go s1.ServeCodec(&SessionServerCodec{jsonrpc.NewServerCodec(conn), sess}) +} + +func (sm *sessionManager) removeSession(sess *session) { + sm.sessionsMut.Lock() + defer sm.sessionsMut.Unlock() + if sess.headersSub { + delete(sm.headerSubs, sess.addr) + } + for hashX := range sess.hashXSubs { + subs, ok := sm.hashXSubs[hashX] + if !ok { + continue + } + delete(subs, sess.addr) + } + delete(sm.sessions, sess.addr) + sess.client.Close() + sess.conn.Close() +} + +func (sm *sessionManager) headersSubscribe(sess *session, raw bool, subscribe bool) { + sm.sessionsMut.Lock() + defer sm.sessionsMut.Unlock() + if subscribe { + sm.headerSubs[sess.addr] = sess + sess.headersSub = true + sess.headersSubRaw = raw + return + } + delete(sm.headerSubs, sess.addr) + sess.headersSub = false + sess.headersSubRaw = false +} + +func (sm *sessionManager) hashXSubscribe(sess *session, hashX []byte, original string, subscribe bool) { + sm.sessionsMut.Lock() + defer sm.sessionsMut.Unlock() + var key [HASHX_LEN]byte + copy(key[:], hashX) + subs, ok := sm.hashXSubs[key] + if subscribe { + if !ok { + subs = make(sessionMap) + sm.hashXSubs[key] = subs + } + subs[sess.addr] = sess + sess.hashXSubs[key] = original + return + } + if ok { + delete(subs, sess.addr) + if len(subs) == 0 { + delete(sm.hashXSubs, key) + } + } + delete(sess.hashXSubs, key) +} + +func (sm *sessionManager) doNotify(notification interface{}) { + sm.sessionsMut.RLock() + var subsCopy sessionMap + switch notification.(type) { + case headerNotification: + note, _ := notification.(headerNotification) + subsCopy = sm.headerSubs + if len(subsCopy) > 0 { + note.blockHeaderElectrum = newBlockHeaderElectrum(¬e.blockHeader, uint32(note.Height)) + note.blockHeaderStr = hex.EncodeToString(note.blockHeader[:]) + } + case hashXNotification: + note, _ := notification.(hashXNotification) + hashXSubs, ok := sm.hashXSubs[note.hashX] + if ok { + subsCopy = hashXSubs + } + if len(subsCopy) > 0 { + note.statusStr = hex.EncodeToString(note.status) + } + default: + log.Warnf("unknown notification type: %v", notification) + } + sm.sessionsMut.RUnlock() + + // Deliver notification to relevant sessions. + for _, sess := range subsCopy { + sess.doNotify(notification) + } +} + +type SessionServerCodec struct { + rpc.ServerCodec + sess *session +} + +// ReadRequestHeader provides ability to rewrite the incoming +// request "method" field. For example: +// blockchain.block.get_header -> blockchain.block.Get_header +// blockchain.address.listunspent -> blockchain.address.Listunspent +// This makes the "method" string compatible with rpc.Server +// requirements. +func (c *SessionServerCodec) ReadRequestHeader(req *rpc.Request) error { + log.Infof("receive header from %v", c.sess.addr.String()) + err := c.ServerCodec.ReadRequestHeader(req) + if err != nil { + log.Warnf("error: %v", err) + return err + } + rawMethod := req.ServiceMethod + parts := strings.Split(rawMethod, ".") + if len(parts) < 2 { + return fmt.Errorf("blockchain rpc: service/method ill-formed: %q", rawMethod) + } + service := strings.Join(parts[0:len(parts)-1], ".") + method := parts[len(parts)-1] + if len(method) < 1 { + return fmt.Errorf("blockchain rpc: method ill-formed: %q", method) + } + method = strings.ToUpper(string(method[0])) + string(method[1:]) + req.ServiceMethod = service + "." + method + return err +} + +// ReadRequestBody wraps the regular implementation, but updates session stats too. +func (c *SessionServerCodec) ReadRequestBody(params any) error { + err := c.ServerCodec.ReadRequestBody(params) + if err != nil { + log.Warnf("error: %v", err) + return err + } + log.Infof("receive body from %v", c.sess.addr.String()) + // Bump last receive time. + c.sess.lastRecv = time.Now() + return err +} + +// WriteResponse wraps the regular implementation, but updates session stats too. +func (c *SessionServerCodec) WriteResponse(resp *rpc.Response, reply any) error { + log.Infof("respond to %v", c.sess.addr.String()) + err := c.ServerCodec.WriteResponse(resp, reply) + if err != nil { + return err + } + // Bump last send time. + c.sess.lastSend = time.Now() + return err +}