From 6a28d809c28c476cb1c586c1e0c63c002911994a Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Sun, 13 Nov 2022 11:47:55 +0000 Subject: [PATCH] getclaimbyid and search --- db/db_get.go | 79 ++++++++++++++++++++++++++++- server/jsonrpc_blockchain_test.go | 4 +- server/jsonrpc_claimtrie.go | 84 ++++++++++++++++++++++++++++++- server/jsonrpc_federation.go | 2 +- server/jsonrpc_service.go | 2 +- server/server.go | 4 +- server/server_test_pkg.go | 4 +- server/session.go | 6 ++- 8 files changed, 174 insertions(+), 11 deletions(-) diff --git a/db/db_get.go b/db/db_get.go index b6f0c10..f11c953 100644 --- a/db/db_get.go +++ b/db/db_get.go @@ -958,7 +958,7 @@ func (db *ReadOnlyDBColumnFamily) GetTxMerkle(tx_hashes []chainhash.Hash) ([]TxM } blockTxsCache[txHeight] = txs } - blockTxs, _ := blockTxsCache[txHeight] + blockTxs := blockTxsCache[txHeight] results = append(results, TxMerkle{ TxHash: txNumKey.TxHash, RawTx: txVal.RawTx, @@ -970,6 +970,83 @@ func (db *ReadOnlyDBColumnFamily) GetTxMerkle(tx_hashes []chainhash.Hash) ([]TxM return results, nil } +func (db *ReadOnlyDBColumnFamily) GetClaimByID(claimID string) ([]*ExpandedResolveResult, []*ExpandedResolveResult, error) { + // claim, err := db.GetCachedClaimTxo(claimID, true) + // if err != nil { + // return nil, err + // } + + // activation, err := db.GetActivation(claim.TxNum, claim.Position) + // if err != nil { + // return nil, err + // } + + // return PrepareResolveResult( + // db, + // claim.TxNum, + // claim.Position, + // claimID, + // claim.Name, + // claim.RootTxNum, + // claim.RootPosition, + // activation, + // claim.ChannelSignatureIsValid, + // ) + + /* + def _getclaimbyid(self, claim_id: str): + rows = [] + extra = [] + claim_hash = bytes.fromhex(claim_id) + stream = self.db._fs_get_claim_by_hash(claim_hash) + rows.append(stream or LookupError(f"Could not find claim at {claim_id}")) + if stream and stream.channel_hash: + channel = self.db._fs_get_claim_by_hash(stream.channel_hash) + extra.append(channel or LookupError(f"Could not find channel at {stream.channel_hash.hex()}")) + if stream and stream.reposted_claim_hash: + repost = self.db._fs_get_claim_by_hash(stream.reposted_claim_hash) + if repost: + extra.append(repost) + return Outputs.to_base64(rows, extra, 0, None, None) + */ + rows := make([]*ExpandedResolveResult, 0) + extras := make([]*ExpandedResolveResult, 0) + claimHash, err := hex.DecodeString(claimID) + if err != nil { + return nil, nil, err + } + + stream, err := db.FsGetClaimByHash(claimHash) + if err != nil { + return nil, nil, err + } + var res = NewExpandedResolveResult() + res.Stream = &optionalResolveResultOrError{res: stream} + rows = append(rows, res) + + if stream != nil && stream.ChannelHash != nil { + channel, err := db.FsGetClaimByHash(stream.ChannelHash) + if err != nil { + return nil, nil, err + } + var res = NewExpandedResolveResult() + res.Channel = &optionalResolveResultOrError{res: channel} + extras = append(extras, res) + } + + if stream != nil && stream.RepostedClaimHash != nil { + repost, err := db.FsGetClaimByHash(stream.RepostedClaimHash) + if err != nil { + return nil, nil, err + } + var res = NewExpandedResolveResult() + res.Repost = &optionalResolveResultOrError{res: repost} + extras = append(extras, res) + } + + return rows, extras, nil +} + func (db *ReadOnlyDBColumnFamily) GetDBState() (*prefixes.DBStateValue, error) { handle, err := db.EnsureHandle(prefixes.DBState) if err != nil { diff --git a/server/jsonrpc_blockchain_test.go b/server/jsonrpc_blockchain_test.go index 4616d20..634f027 100644 --- a/server/jsonrpc_blockchain_test.go +++ b/server/jsonrpc_blockchain_test.go @@ -204,7 +204,7 @@ func TestHeadersSubscribe(t *testing.T) { return } - sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams) + sm := newSessionManager(nil, db, args, grp, &chaincfg.RegressionNetParams) sm.start() defer sm.stop() @@ -385,7 +385,7 @@ func TestAddressSubscribe(t *testing.T) { return } - sm := newSessionManager(db, args, grp, &chaincfg.RegressionNetParams) + sm := newSessionManager(nil, db, args, grp, &chaincfg.RegressionNetParams) sm.start() defer sm.stop() diff --git a/server/jsonrpc_claimtrie.go b/server/jsonrpc_claimtrie.go index a8ef42b..33301b0 100644 --- a/server/jsonrpc_claimtrie.go +++ b/server/jsonrpc_claimtrie.go @@ -1,13 +1,20 @@ package server import ( + "context" + "fmt" + "github.com/lbryio/herald.go/db" + "github.com/lbryio/herald.go/internal/metrics" pb "github.com/lbryio/herald.go/protobuf/go" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" ) type ClaimtrieService struct { - DB *db.ReadOnlyDBColumnFamily + DB *db.ReadOnlyDBColumnFamily + Server *Server } type ResolveData struct { @@ -18,6 +25,10 @@ type Result struct { Data string `json:"data"` } +type GetClaimByIDData struct { + ClaimID string `json:"claim_id"` +} + // Resolve is the json rpc endpoint for 'blockchain.claimtrie.resolve'. func (t *ClaimtrieService) Resolve(args *ResolveData, result **pb.Outputs) error { log.Println("Resolve") @@ -25,3 +36,74 @@ func (t *ClaimtrieService) Resolve(args *ResolveData, result **pb.Outputs) error *result = res return err } + +// Search is the json rpc endpoint for 'blockchain.claimtrie.search'. +func (t *ClaimtrieService) Search(args *pb.SearchRequest, result **pb.Outputs) error { + log.Println("Search") + if t.Server == nil { + log.Warnln("Server is nil in Search") + *result = nil + return nil + } + ctx := context.Background() + res, err := t.Server.Search(ctx, args) + *result = res + return err +} + +// GetClaimByID is the json rpc endpoint for 'blockchain.claimtrie.getclaimbyid'. +func (t *ClaimtrieService) GetClaimByID(args *GetClaimByIDData, result **pb.Outputs) error { + log.Println("GetClaimByID") + if len(args.ClaimID) != 40 { + *result = nil + return fmt.Errorf("len(claim_id) != 40") + } + + rows, extras, err := t.DB.GetClaimByID(args.ClaimID) + if err != nil { + *result = nil + return err + } + + metrics.RequestsCount.With(prometheus.Labels{"method": "blockchain.claimtrie.getclaimbyid"}).Inc() + + // FIXME: this has txos and extras and so does GetClaimById? + allTxos := make([]*pb.Output, 0) + allExtraTxos := make([]*pb.Output, 0) + + for _, row := range rows { + txos, extraTxos, err := row.ToOutputs() + if err != nil { + *result = nil + return err + } + // TODO: there may be a more efficient way to do this. + allTxos = append(allTxos, txos...) + allExtraTxos = append(allExtraTxos, extraTxos...) + } + + for _, extra := range extras { + txos, extraTxos, err := extra.ToOutputs() + if err != nil { + *result = nil + return err + } + // TODO: there may be a more efficient way to do this. + allTxos = append(allTxos, txos...) + allExtraTxos = append(allExtraTxos, extraTxos...) + } + + res := &pb.Outputs{ + Txos: allTxos, + ExtraTxos: allExtraTxos, + Total: uint32(len(allTxos) + len(allExtraTxos)), + Offset: 0, //TODO + Blocked: nil, //TODO + BlockedTotal: 0, //TODO + } + + logrus.Warn(res) + + *result = res + return nil +} diff --git a/server/jsonrpc_federation.go b/server/jsonrpc_federation.go index 4ae8870..9dcd203 100644 --- a/server/jsonrpc_federation.go +++ b/server/jsonrpc_federation.go @@ -15,7 +15,7 @@ type PeersSubscribeReq struct { type PeersSubscribeResp struct{} // Features is the json rpc endpoint for 'server.peers.subcribe'. -func (t *ServerService) PeersSubscribe(req *PeersSubscribeReq, res **PeersSubscribeResp) error { +func (t *PeersService) Subscribe(req *PeersSubscribeReq, res **PeersSubscribeResp) error { log.Println("PeersSubscribe") *res = nil diff --git a/server/jsonrpc_service.go b/server/jsonrpc_service.go index 47f5d72..9952fea 100644 --- a/server/jsonrpc_service.go +++ b/server/jsonrpc_service.go @@ -91,7 +91,7 @@ fail1: s1.RegisterCodec(&gorillaRpcCodec{gorilla_json.NewCodec()}, "application/json") // Register "blockchain.claimtrie.*"" handlers. - claimtrieSvc := &ClaimtrieService{s.DB} + claimtrieSvc := &ClaimtrieService{s.DB, s} err := s1.RegisterTCPService(claimtrieSvc, "blockchain_claimtrie") if err != nil { log.Errorf("RegisterTCPService: %v\n", err) diff --git a/server/server.go b/server/server.go index 7af55ac..3227799 100644 --- a/server/server.go +++ b/server/server.go @@ -362,8 +362,10 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server { HeightSubsMut: sync.RWMutex{}, NotifierChan: make(chan interface{}, 1), Grp: grp, - sessionManager: newSessionManager(myDB, args, sessionGrp, &chain), + sessionManager: nil, } + // FIXME: HACK + s.sessionManager = newSessionManager(s, myDB, args, sessionGrp, &chain) // Start up our background services if !args.DisableResolve && !args.DisableRocksDBRefresh { diff --git a/server/server_test_pkg.go b/server/server_test_pkg.go index 31230c7..3d64879 100644 --- a/server/server_test_pkg.go +++ b/server/server_test_pkg.go @@ -14,6 +14,6 @@ func (s *Server) GetNumPeersExported() func() int64 { return s.getNumPeers } -func NewSessionManagerExported(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager { - return newSessionManager(db, args, grp, chain) +func NewSessionManagerExported(server *Server, db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager { + return newSessionManager(server, db, args, grp, chain) } diff --git a/server/session.go b/server/session.go index 2d0107c..4b358bd 100644 --- a/server/session.go +++ b/server/session.go @@ -138,6 +138,7 @@ type sessionManager struct { manageTicker *time.Ticker db *db.ReadOnlyDBColumnFamily args *Args + server *Server chain *chaincfg.Params // peerSubs are sessions subscribed via 'blockchain.peers.subscribe' peerSubs sessionMap @@ -147,7 +148,7 @@ type sessionManager struct { hashXSubs map[[HASHX_LEN]byte]sessionMap } -func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager { +func newSessionManager(server *Server, db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Group, chain *chaincfg.Params) *sessionManager { return &sessionManager{ sessions: make(sessionMap), grp: grp, @@ -156,6 +157,7 @@ func newSessionManager(db *db.ReadOnlyDBColumnFamily, args *Args, grp *stop.Grou manageTicker: time.NewTicker(time.Duration(max(5, args.SessionTimeout/20)) * time.Second), db: db, args: args, + server: server, chain: chain, peerSubs: make(sessionMap), headerSubs: make(sessionMap), @@ -234,7 +236,7 @@ func (sm *sessionManager) addSession(conn net.Conn) *session { } // Register "blockchain.claimtrie.*"" handlers. - claimtrieSvc := &ClaimtrieService{sm.db} + claimtrieSvc := &ClaimtrieService{sm.db, sm.server} err = s1.RegisterName("blockchain.claimtrie", claimtrieSvc) if err != nil { log.Errorf("RegisterName: %v\n", err)