Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
package server
|
|
|
|
|
|
|
|
import (
|
2022-11-13 12:47:55 +01:00
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
"github.com/lbryio/herald.go/db"
|
2022-11-13 12:47:55 +01:00
|
|
|
"github.com/lbryio/herald.go/internal/metrics"
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
pb "github.com/lbryio/herald.go/protobuf/go"
|
2022-11-13 12:47:55 +01:00
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
)
|
|
|
|
|
|
|
|
type ClaimtrieService struct {
|
2022-11-13 12:47:55 +01:00
|
|
|
DB *db.ReadOnlyDBColumnFamily
|
|
|
|
Server *Server
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
type ResolveData struct {
|
|
|
|
Data []string `json:"data"`
|
|
|
|
}
|
|
|
|
|
|
|
|
type Result struct {
|
|
|
|
Data string `json:"data"`
|
|
|
|
}
|
|
|
|
|
2022-11-13 12:47:55 +01:00
|
|
|
type GetClaimByIDData struct {
|
|
|
|
ClaimID string `json:"claim_id"`
|
|
|
|
}
|
|
|
|
|
Add subscribe/unsubscribe RPCs. Add session, sessionManager, and serve JSON RPC (without HTTP). (#66)
* Move and rename BlockchainCodec, BlockchainCodecRequest.
These are not specifically "blockchain", rather they are
specific to how gorilla/rpc works.
* Move claimtrie-related service/handlers to jsonrpc_claimtrie.go.
* Pull out decode logic into named func newBlockHeaderElectrum().
* Rename BlockchainService -> BlockchainBlockService.
* Drop http.Request arg from handlers, and use RegisterTCPService().
* Implement GetStatus() to pull data from HashXStatus table.
* Make the service objects independent, so we don't have inheritance.
* Add core session/subscription logic (session.go).
Implement subsribe/unsubscribe handlers.
* Support both pure JSON and JSON-over-HTTP services.
Forward NotifierChan messages to sessionManager.
* Only assign default port (50001) if neither --json-rpc-port nor
--json-rpc-http-port are specified.
* Handle failures with goto instead of break. Update error logging.
* Add --max-sessions, --session-timeout args. Enforce max sessions.
* Changes to make session.go testable. Conn created with Pipe()
used in testing has no unique Addr.
* Add tests for headers, headers.subscribe, address.subscribe.
* HashXStatus, HashXMempoolStatus not populated by default. Fix GetStatus().
* Use time.Ticker object to drive management activity.
2022-10-04 16:05:06 +02:00
|
|
|
// Resolve is the json rpc endpoint for 'blockchain.claimtrie.resolve'.
|
|
|
|
func (t *ClaimtrieService) Resolve(args *ResolveData, result **pb.Outputs) error {
|
|
|
|
log.Println("Resolve")
|
|
|
|
res, err := InternalResolve(args.Data, t.DB)
|
|
|
|
*result = res
|
|
|
|
return err
|
|
|
|
}
|
2022-11-13 12:47:55 +01:00
|
|
|
|
|
|
|
// Search is the json rpc endpoint for 'blockchain.claimtrie.search'.
|
|
|
|
func (t *ClaimtrieService) Search(args *pb.SearchRequest, result **pb.Outputs) error {
|
|
|
|
log.Println("Search")
|
|
|
|
if t.Server == nil {
|
|
|
|
log.Warnln("Server is nil in Search")
|
|
|
|
*result = nil
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
ctx := context.Background()
|
|
|
|
res, err := t.Server.Search(ctx, args)
|
|
|
|
*result = res
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetClaimByID is the json rpc endpoint for 'blockchain.claimtrie.getclaimbyid'.
|
|
|
|
func (t *ClaimtrieService) GetClaimByID(args *GetClaimByIDData, result **pb.Outputs) error {
|
|
|
|
log.Println("GetClaimByID")
|
|
|
|
if len(args.ClaimID) != 40 {
|
|
|
|
*result = nil
|
|
|
|
return fmt.Errorf("len(claim_id) != 40")
|
|
|
|
}
|
|
|
|
|
|
|
|
rows, extras, err := t.DB.GetClaimByID(args.ClaimID)
|
|
|
|
if err != nil {
|
|
|
|
*result = nil
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
metrics.RequestsCount.With(prometheus.Labels{"method": "blockchain.claimtrie.getclaimbyid"}).Inc()
|
|
|
|
|
|
|
|
// FIXME: this has txos and extras and so does GetClaimById?
|
|
|
|
allTxos := make([]*pb.Output, 0)
|
|
|
|
allExtraTxos := make([]*pb.Output, 0)
|
|
|
|
|
|
|
|
for _, row := range rows {
|
|
|
|
txos, extraTxos, err := row.ToOutputs()
|
|
|
|
if err != nil {
|
|
|
|
*result = nil
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// TODO: there may be a more efficient way to do this.
|
|
|
|
allTxos = append(allTxos, txos...)
|
|
|
|
allExtraTxos = append(allExtraTxos, extraTxos...)
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, extra := range extras {
|
|
|
|
txos, extraTxos, err := extra.ToOutputs()
|
|
|
|
if err != nil {
|
|
|
|
*result = nil
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// TODO: there may be a more efficient way to do this.
|
|
|
|
allTxos = append(allTxos, txos...)
|
|
|
|
allExtraTxos = append(allExtraTxos, extraTxos...)
|
|
|
|
}
|
|
|
|
|
|
|
|
res := &pb.Outputs{
|
|
|
|
Txos: allTxos,
|
|
|
|
ExtraTxos: allExtraTxos,
|
|
|
|
Total: uint32(len(allTxos) + len(allExtraTxos)),
|
|
|
|
Offset: 0, //TODO
|
|
|
|
Blocked: nil, //TODO
|
|
|
|
BlockedTotal: 0, //TODO
|
|
|
|
}
|
|
|
|
|
2022-12-07 13:46:51 +01:00
|
|
|
log.Warn(res)
|
2022-11-13 12:47:55 +01:00
|
|
|
|
|
|
|
*result = res
|
|
|
|
return nil
|
|
|
|
}
|