WIP: Json rpc federation, search/getclaimbyid, and shutdown #76

Merged
jeffreypicard merged 11 commits from json-rpc-federation-and-shutdown into master 2022-12-07 17:01:36 +01:00
8 changed files with 174 additions and 11 deletions
Showing only changes of commit 6a28d809c2 - Show all commits

View file

@ -958,7 +958,7 @@ func (db *ReadOnlyDBColumnFamily) GetTxMerkle(tx_hashes []chainhash.Hash) ([]TxM
}
blockTxsCache[txHeight] = txs
}
blockTxs, _ := blockTxsCache[txHeight]
blockTxs := blockTxsCache[txHeight]
results = append(results, TxMerkle{
TxHash: txNumKey.TxHash,
RawTx: txVal.RawTx,
@ -970,6 +970,83 @@ func (db *ReadOnlyDBColumnFamily) GetTxMerkle(tx_hashes []chainhash.Hash) ([]TxM
return results, nil
}
func (db *ReadOnlyDBColumnFamily) GetClaimByID(claimID string) ([]*ExpandedResolveResult, []*ExpandedResolveResult, error) {
// claim, err := db.GetCachedClaimTxo(claimID, true)
// if err != nil {
// return nil, err
// }
// activation, err := db.GetActivation(claim.TxNum, claim.Position)
// if err != nil {
// return nil, err
// }
// return PrepareResolveResult(
// db,
// claim.TxNum,
// claim.Position,
// claimID,
// claim.Name,
// claim.RootTxNum,
// claim.RootPosition,
// activation,
// claim.ChannelSignatureIsValid,
// )
/*
def _getclaimbyid(self, claim_id: str):
rows = []
extra = []
claim_hash = bytes.fromhex(claim_id)
stream = self.db._fs_get_claim_by_hash(claim_hash)
rows.append(stream or LookupError(f"Could not find claim at {claim_id}"))
if stream and stream.channel_hash:
channel = self.db._fs_get_claim_by_hash(stream.channel_hash)
extra.append(channel or LookupError(f"Could not find channel at {stream.channel_hash.hex()}"))
if stream and stream.reposted_claim_hash:
repost = self.db._fs_get_claim_by_hash(stream.reposted_claim_hash)
if repost:
extra.append(repost)
return Outputs.to_base64(rows, extra, 0, None, None)
*/
rows := make([]*ExpandedResolveResult, 0)
extras := make([]*ExpandedResolveResult, 0)
claimHash, err := hex.DecodeString(claimID)
if err != nil {
return nil, nil, err
}
stream, err := db.FsGetClaimByHash(claimHash)
if err != nil {
return nil, nil, err
}
var res = NewExpandedResolveResult()
res.Stream = &optionalResolveResultOrError{res: stream}
rows = append(rows, res)
if stream != nil && stream.ChannelHash != nil {
channel, err := db.FsGetClaimByHash(stream.ChannelHash)
if err != nil {
return nil, nil, err
}
var res = NewExpandedResolveResult()
res.Channel = &optionalResolveResultOrError{res: channel}
extras = append(extras, res)
}
if stream != nil && stream.RepostedClaimHash != nil {
repost, err := db.FsGetClaimByHash(stream.RepostedClaimHash)
if err != nil {
return nil, nil, err
}
var res = NewExpandedResolveResult()
res.Repost = &optionalResolveResultOrError{res: repost}
extras = append(extras, res)
}
return rows, extras, nil
}
func (db *ReadOnlyDBColumnFamily) GetDBState() (*prefixes.DBStateValue, error) {
handle, err := db.EnsureHandle(prefixes.DBState)
if err != nil {

View file

@ -204,7 +204,7 @@ func TestHeadersSubscribe(t *testing.T) {
return
}
sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams)
sm := newSessionManager(nil, db, args, grp, &chaincfg.RegressionNetParams)
sm.start()
defer sm.stop()
@ -385,7 +385,7 @@ func TestAddressSubscribe(t *testing.T) {
return
}
sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams)
sm := newSessionManager(nil, db, args, grp, &chaincfg.RegressionNetParams)
sm.start()
defer sm.stop()

View file

@ -1,13 +1,20 @@
package server
import (
"context"
"fmt"
"github.com/lbryio/herald.go/db"
"github.com/lbryio/herald.go/internal/metrics"
pb "github.com/lbryio/herald.go/protobuf/go"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
)
type ClaimtrieService struct {
DB *db.ReadOnlyDBColumnFamily
DB *db.ReadOnlyDBColumnFamily
Server *Server
}
type ResolveData struct {
@ -18,6 +25,10 @@ type Result struct {
Data string `json:"data"`
}
type GetClaimByIDData struct {
ClaimID string `json:"claim_id"`
}
// Resolve is the json rpc endpoint for 'blockchain.claimtrie.resolve'.
func (t *ClaimtrieService) Resolve(args *ResolveData, result **pb.Outputs) error {
log.Println("Resolve")
@ -25,3 +36,74 @@ func (t *ClaimtrieService) Resolve(args *ResolveData, result **pb.Outputs) error
*result = res
return err
}
// Search is the json rpc endpoint for 'blockchain.claimtrie.search'.
func (t *ClaimtrieService) Search(args *pb.SearchRequest, result **pb.Outputs) error {
log.Println("Search")
if t.Server == nil {
log.Warnln("Server is nil in Search")
*result = nil
return nil
}
ctx := context.Background()
res, err := t.Server.Search(ctx, args)
*result = res
return err
}
// GetClaimByID is the json rpc endpoint for 'blockchain.claimtrie.getclaimbyid'.
func (t *ClaimtrieService) GetClaimByID(args *GetClaimByIDData, result **pb.Outputs) error {
log.Println("GetClaimByID")
if len(args.ClaimID) != 40 {
*result = nil
return fmt.Errorf("len(claim_id) != 40")
}
rows, extras, err := t.DB.GetClaimByID(args.ClaimID)
if err != nil {
*result = nil
return err
}
metrics.RequestsCount.With(prometheus.Labels{"method": "blockchain.claimtrie.getclaimbyid"}).Inc()
// FIXME: this has txos and extras and so does GetClaimById?
moodyjon commented 2022-11-16 18:36:00 +01:00 (Migrated from github.com)
Review

OK, I see why you pass the server.Server into the session manager, and also store it in ClaimtrieService.

The python hub has a SearchIndex class which contains the search functionality of the HubServerService: 75d64f9dc6/hub/herald/search.py (L29)

The SearchIndex is passed off to the session manager instead of the whole HubServerService:
https://github.com/lbryio/hub/search?q=SearchIndex

Low priority to do this now, but splitting the search functionality (elastic search client, caches for search, etc) from server.Server into a SearchIndex struct/obj would be nice.

OK, I see why you pass the `server.Server` into the session manager, and also store it in `ClaimtrieService`. The python hub has a `SearchIndex` class which contains the search functionality of the `HubServerService`: https://github.com/lbryio/hub/blob/75d64f9dc6d3b2c913b8b10053bd3589e7a2e8eb/hub/herald/search.py#L29 The `SearchIndex` is passed off to the session manager instead of the whole `HubServerService`: https://github.com/lbryio/hub/search?q=SearchIndex Low priority to do this now, but splitting the search functionality (elastic search client, caches for search, etc) from `server.Server` into a `SearchIndex` struct/obj would be nice.
jeffreypicard commented 2022-11-17 23:38:50 +01:00 (Migrated from github.com)
Review

Yup, this definitely needs to be refactored in the future.

Yup, this definitely needs to be refactored in the future.
allTxos := make([]*pb.Output, 0)
allExtraTxos := make([]*pb.Output, 0)
for _, row := range rows {
txos, extraTxos, err := row.ToOutputs()
if err != nil {
*result = nil
return err
}
// TODO: there may be a more efficient way to do this.
allTxos = append(allTxos, txos...)
allExtraTxos = append(allExtraTxos, extraTxos...)
}
for _, extra := range extras {
txos, extraTxos, err := extra.ToOutputs()
if err != nil {
*result = nil
return err
}
// TODO: there may be a more efficient way to do this.
allTxos = append(allTxos, txos...)
allExtraTxos = append(allExtraTxos, extraTxos...)
}
res := &pb.Outputs{
Txos: allTxos,
ExtraTxos: allExtraTxos,
Total: uint32(len(allTxos) + len(allExtraTxos)),
Offset: 0, //TODO
Blocked: nil, //TODO
BlockedTotal: 0, //TODO
}
logrus.Warn(res)
*result = res
return nil
}

View file

@ -15,7 +15,7 @@ type PeersSubscribeReq struct {
moodyjon commented 2022-11-17 19:39:11 +01:00 (Migrated from github.com)
Review

Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have sessionManager.peersSubs already in place.

Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
moodyjon commented 2022-11-19 01:10:46 +01:00 (Migrated from github.com)
Review

Ahh.... I meant register the subscription in the sessionManager.peerSubs map. It needs registration there as it's communicating by JSONRPC. Then inside Server.notifyPeerSubs, call s.sessionManager.doNotify() to forward a notification to the session-based mechanism.

The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Server.PeerSubs for those.

Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism. The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
jeffreypicard commented 2022-11-19 08:34:17 +01:00 (Migrated from github.com)
Review

Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah

Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
moodyjon commented 2022-11-17 19:39:11 +01:00 (Migrated from github.com)
Review

Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have sessionManager.peersSubs already in place.

Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
moodyjon commented 2022-11-19 01:10:46 +01:00 (Migrated from github.com)
Review

Ahh.... I meant register the subscription in the sessionManager.peerSubs map. It needs registration there as it's communicating by JSONRPC. Then inside Server.notifyPeerSubs, call s.sessionManager.doNotify() to forward a notification to the session-based mechanism.

The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Server.PeerSubs for those.

Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism. The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
jeffreypicard commented 2022-11-19 08:34:17 +01:00 (Migrated from github.com)
Review

Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah

Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
type PeersSubscribeResp struct{}
// Features is the json rpc endpoint for 'server.peers.subcribe'.
func (t *ServerService) PeersSubscribe(req *PeersSubscribeReq, res **PeersSubscribeResp) error {
moodyjon commented 2022-11-17 19:39:11 +01:00 (Migrated from github.com)
Review

Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have sessionManager.peersSubs already in place.

Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
moodyjon commented 2022-11-19 01:10:46 +01:00 (Migrated from github.com)
Review

Ahh.... I meant register the subscription in the sessionManager.peerSubs map. It needs registration there as it's communicating by JSONRPC. Then inside Server.notifyPeerSubs, call s.sessionManager.doNotify() to forward a notification to the session-based mechanism.

The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Server.PeerSubs for those.

Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism. The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
jeffreypicard commented 2022-11-19 08:34:17 +01:00 (Migrated from github.com)
Review

Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah

Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
func (t *PeersService) Subscribe(req *PeersSubscribeReq, res **PeersSubscribeResp) error {
moodyjon commented 2022-11-17 19:39:11 +01:00 (Migrated from github.com)
Review

Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have sessionManager.peersSubs already in place.

Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
moodyjon commented 2022-11-19 01:10:46 +01:00 (Migrated from github.com)
Review

Ahh.... I meant register the subscription in the sessionManager.peerSubs map. It needs registration there as it's communicating by JSONRPC. Then inside Server.notifyPeerSubs, call s.sessionManager.doNotify() to forward a notification to the session-based mechanism.

The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Server.PeerSubs for those.

Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism. The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
jeffreypicard commented 2022-11-19 08:34:17 +01:00 (Migrated from github.com)
Review

Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah

Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
log.Println("PeersSubscribe")
*res = nil

moodyjon commented 2022-11-17 19:39:11 +01:00 (Migrated from github.com)
Review

Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have sessionManager.peersSubs already in place.

Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
moodyjon commented 2022-11-19 01:10:46 +01:00 (Migrated from github.com)
Review

Ahh.... I meant register the subscription in the sessionManager.peerSubs map. It needs registration there as it's communicating by JSONRPC. Then inside Server.notifyPeerSubs, call s.sessionManager.doNotify() to forward a notification to the session-based mechanism.

The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Server.PeerSubs for those.

Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism. The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
jeffreypicard commented 2022-11-19 08:34:17 +01:00 (Migrated from github.com)
Review

Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah

Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah
moodyjon commented 2022-11-17 19:39:11 +01:00 (Migrated from github.com)
Review

Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have sessionManager.peersSubs already in place.

Might as well complete this by adding sessionManager to PeersService (or other appropriate pointer). You have `sessionManager.peersSubs` already in place.
moodyjon commented 2022-11-19 01:10:46 +01:00 (Migrated from github.com)
Review

Ahh.... I meant register the subscription in the sessionManager.peerSubs map. It needs registration there as it's communicating by JSONRPC. Then inside Server.notifyPeerSubs, call s.sessionManager.doNotify() to forward a notification to the session-based mechanism.

The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping Server.PeerSubs for those.

Ahh.... I meant register the subscription in the `sessionManager.peerSubs` map. It needs registration there as it's communicating by JSONRPC. Then inside `Server.notifyPeerSubs`, call `s.sessionManager.doNotify()` to forward a notification to the session-based mechanism. The stuff in federation.go is more oriented towards grpc service (and managing the list of known peers). If PeersSubscribe comes in via GRPC, there's a separate mapping `Server.PeerSubs` for those.
jeffreypicard commented 2022-11-19 08:34:17 +01:00 (Migrated from github.com)
Review

Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah

Ahh, yeah, just hooking it into the gRPC like this isn't going to quite work I suppose hahah

View file

@ -91,7 +91,7 @@ fail1:
s1.RegisterCodec(&gorillaRpcCodec{gorilla_json.NewCodec()}, "application/json")
// Register "blockchain.claimtrie.*"" handlers.
claimtrieSvc := &ClaimtrieService{s.DB}
claimtrieSvc := &ClaimtrieService{s.DB, s}
err := s1.RegisterTCPService(claimtrieSvc, "blockchain_claimtrie")
if err != nil {
log.Errorf("RegisterTCPService: %v\n", err)

View file

@ -362,8 +362,10 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
HeightSubsMut: sync.RWMutex{},
NotifierChan: make(chan interface{}, 1),
Grp: grp,
sessionManager: newSessionManager(myDB, args, sessionGrp, &chain),
sessionManager: nil,
}
// FIXME: HACK
s.sessionManager = newSessionManager(s, myDB, args, sessionGrp, &chain)
// Start up our background services
if !args.DisableResolve && !args.DisableRocksDBRefresh {

View file

@ -14,6 +14,6 @@ func (s *Server) GetNumPeersExported() func() int64 {
return s.getNumPeers
}
func NewSessionManagerExported(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
return newSessionManager(db, args, grp, chain)
func NewSessionManagerExported(server *Server, db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
return newSessionManager(server, db, args, grp, chain)
}

View file

@ -138,6 +138,7 @@ type sessionManager struct {
manageTicker *time.Ticker
db *db.ReadOnlyDBColumnFamily
args *Args
server *Server
chain *chaincfg.Params
// peerSubs are sessions subscribed via 'blockchain.peers.subscribe'
peerSubs sessionMap
@ -147,7 +148,7 @@ type sessionManager struct {
hashXSubs map[[HASHX_LEN]byte]sessionMap
}
func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
func newSessionManager(server *Server, db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager {
return &sessionManager{
sessions: make(sessionMap),
grp: grp,
@ -156,6 +157,7 @@ func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Grou
manageTicker: time.NewTicker(time.Duration(max(5, args.SessionTimeout/20)) * time.Second),
db: db,
args: args,
server: server,
chain: chain,
peerSubs: make(sessionMap),
headerSubs: make(sessionMap),
@ -234,7 +236,7 @@ func (sm *sessionManager) addSession(conn net.Conn) *session {
}
// Register "blockchain.claimtrie.*"" handlers.
claimtrieSvc := &ClaimtrieService{sm.db}
claimtrieSvc := &ClaimtrieService{sm.db, sm.server}
err = s1.RegisterName("blockchain.claimtrie", claimtrieSvc)
if err != nil {
log.Errorf("RegisterName: %v\n", err)