WIP: Resolve json rpc (#57)

* jsonrpc

* update readme for open file limits

* add CGO flags to readme

* remove uneeded logging

* don't start jsonrpc server in unit tests

* cleanup and add args for json rpc

* correct rpc default port

* remove unused test_rpc.sh script

Co-authored-by: Ubuntu <ubuntu@ns5010184.ip-15-235-15.net>
This commit is contained in:
Jeffrey Picard 2022-09-07 21:36:07 +03:00 committed by GitHub
parent 09fd939b60
commit 9403d84a83
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 127 additions and 10 deletions

1
.gitignore vendored
View file

@ -1,3 +1,4 @@
.idea/
.vscode/
.DS_Store
.venv

View file

@ -175,8 +175,8 @@ func (db *ReadOnlyDBColumnFamily) ResolveParsedUrl(parsed *PathSegment) (*Resolv
for kv := range ch {
key := kv.Key.(*prefixes.ClaimTakeoverKey)
val := kv.Value.(*prefixes.ClaimTakeoverValue)
log.Warnf("ClaimTakeoverKey: %#v", key)
log.Warnf("ClaimTakeoverValue: %#v", val)
log.Tracef("ClaimTakeoverKey: %#v", key)
log.Tracef("ClaimTakeoverValue: %#v", val)
}
controlling, err := db.GetControllingClaim(normalizedName)
log.Warnf("controlling: %#v", controlling)

2
go.mod
View file

@ -8,6 +8,8 @@ require (
github.com/ReneKroon/ttlcache/v2 v2.8.1
github.com/akamensky/argparse v1.2.2
github.com/go-restruct/restruct v1.2.0-alpha
github.com/gorilla/mux v1.7.3
github.com/gorilla/rpc v1.2.0
github.com/lbryio/lbry.go/v3 v3.0.1-beta
github.com/linxGnu/grocksdb v1.6.42
github.com/olivere/elastic/v7 v7.0.24

8
go.sum
View file

@ -235,8 +235,8 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
@ -252,7 +252,9 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORR
github.com/gopherjs/gopherjs v0.0.0-20190915194858-d3ddacdb130f/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/rpc v1.2.0 h1:WvvdC2lNeT1SP32zrIce5l0ECBfbAlmrmSBsuc57wfk=
github.com/gorilla/rpc v1.2.0/go.mod h1:V4h9r+4sF5HnzqbwIez0fKSpANP0zlYd3qR7p36jkTQ=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.2.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
@ -764,7 +766,6 @@ golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210909193231-528a39cd75f3/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211123173158-ef496fb156ab h1:rfJ1bsoJQQIAoAxTxB7bme+vHrNkRw8CqfsYh9w54cw=
golang.org/x/sys v0.0.0-20211123173158-ef496fb156ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@ -817,12 +818,11 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.0.0-20210112230658-8b4aab62c064/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.8-0.20211029000441-d6a9af8af023 h1:0c3L82FDQ5rt1bjTBlchS8t6RQ6299/+5bWMnRLh+uI=
golang.org/x/tools v0.1.8-0.20211029000441-d6a9af8af023/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU=
golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=

View file

@ -39,9 +39,13 @@ func main() {
defer func() {
log.Println("Shutting down server...")
if !s.Args.DisableEs {
s.EsClient.Stop()
}
s.GrpcServer.GracefulStop()
if !s.Args.DisableResolve {
s.DB.Shutdown()
}
log.Println("Returning from main...")
}()

View file

@ -72,6 +72,8 @@ tar xfzv rocksdb-6.29.5.tar.gz
cd rocksdb-6.29.5
make static_lib
sudo make install
export CGO_CFLAGS="-I/usr/local/lib"
export CGO_LDFLAGS="-L/usr/local/lib -lrocksdb -lstdc++ -lm -lz -lsnappy -llz4 -lzstd -lbz2"
```
```
@ -80,6 +82,19 @@ https://github.com/protocolbuffers/protobuf/releases/download/v3.17.1/protobuf-a
If you can run `./protobuf/build.sh` without errors, you have `go` and `protoc` installed correctly.
On Linux you probably need to instead the open file limits
```
ulimit -n 1000000
sysctl -w fs.file-max=1000000
```
and `/etc/security/limits.conf` or `/etc/sysctl.conf` change:
```
fs.file-max = 1000000
```
Finally, run the block processor as described under Usage.
### Running from Source

16
requirements.txt Normal file
View file

@ -0,0 +1,16 @@
certifi==2022.6.15
cffi==1.15.1
charset-normalizer==2.1.0
cryptography==37.0.4
github3.py==3.2.0
grpcio==1.47.0
grpcio-tools==1.47.0
idna==3.3
protobuf==3.20.1
pycparser==2.21
PyJWT==2.4.0
python-dateutil==2.8.2
requests==2.28.1
six==1.16.0
uritemplate==4.1.1
urllib3==1.26.11

View file

@ -25,6 +25,7 @@ type Args struct {
EsPort string
PrometheusPort string
NotifierPort string
JSONRPCPort string
EsIndex string
RefreshDelta int
CacheTTL int
@ -43,6 +44,7 @@ type Args struct {
DisableResolve bool
DisableBlockingAndFiltering bool
DisableStartNotifier bool
DisableStartJSONRPC bool
}
const (
@ -54,6 +56,7 @@ const (
DefaultEsPort = "9200"
DefaultPrometheusPort = "2112"
DefaultNotifierPort = "18080"
DefaultJSONRPCPort = "50001"
DefaultRefreshDelta = 5
DefaultCacheTTL = 5
DefaultPeerFile = "peers.txt"
@ -67,6 +70,7 @@ const (
DefaultDisableResolve = false
DefaultDisableBlockingAndFiltering = false
DisableStartNotifier = false
DisableStartJSONRPC = false
)
var (
@ -112,6 +116,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort})
prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort})
notifierPort := parser.String("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort})
jsonRPCPort := parser.String("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Default: DefaultJSONRPCPort})
esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: DefaultEsIndex})
refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: DefaultRefreshDelta})
cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: DefaultCacheTTL})
@ -131,6 +136,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
disableResolve := parser.Flag("", "disable-resolve", &argparse.Options{Required: false, Help: "Disable resolve endpoint (and rocksdb loading)", Default: DefaultDisableRockDBRefresh})
disableBlockingAndFiltering := parser.Flag("", "disable-blocking-and-filtering", &argparse.Options{Required: false, Help: "Disable blocking and filtering of channels and streams", Default: DefaultDisableBlockingAndFiltering})
disableStartNotifier := parser.Flag("", "disable-start-notifier", &argparse.Options{Required: false, Help: "Disable start notifier", Default: DisableStartNotifier})
disableStartJSONRPC := parser.Flag("", "disable-start-jsonrpc", &argparse.Options{Required: false, Help: "Disable start jsonrpc endpoint", Default: DisableStartJSONRPC})
text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"})
name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"})
@ -157,6 +163,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
EsPort: *esPort,
PrometheusPort: *prometheusPort,
NotifierPort: *notifierPort,
JSONRPCPort: *jsonRPCPort,
EsIndex: *esIndex,
RefreshDelta: *refreshDelta,
CacheTTL: *cacheTTL,
@ -175,6 +182,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
DisableResolve: *disableResolve,
DisableBlockingAndFiltering: *disableBlockingAndFiltering,
DisableStartNotifier: *disableStartNotifier,
DisableStartJSONRPC: *disableStartJSONRPC,
}
if esHost, ok := environment["ELASTIC_HOST"]; ok {

View file

@ -55,6 +55,7 @@ func makeDefaultArgs() *server.Args {
EsPort: server.DefaultEsPort,
PrometheusPort: server.DefaultPrometheusPort,
NotifierPort: server.DefaultNotifierPort,
JSONRPCPort: server.DefaultJSONRPCPort,
EsIndex: server.DefaultEsIndex,
RefreshDelta: server.DefaultRefreshDelta,
CacheTTL: server.DefaultCacheTTL,
@ -70,6 +71,7 @@ func makeDefaultArgs() *server.Args {
DisableResolve: true,
DisableBlockingAndFiltering: true,
DisableStartNotifier: true,
DisableStartJSONRPC: true,
}
return args

50
server/jsonrpc_service.go Normal file
View file

@ -0,0 +1,50 @@
package server
import (
"log"
"net/http"
"github.com/gorilla/mux"
"github.com/gorilla/rpc"
"github.com/gorilla/rpc/json"
"github.com/lbryio/herald.go/db"
pb "github.com/lbryio/herald.go/protobuf/go"
)
type JSONServer struct {
DB *db.ReadOnlyDBColumnFamily
}
type ResolveData struct {
Data []string `json:"data"`
}
type Result struct {
Data string `json:"data"`
}
// Resolve is the json rpc endpoint for resolve
func (t *JSONServer) Resolve(r *http.Request, args *ResolveData, result **pb.Outputs) error {
log.Println("Resolve")
res, err := InternalResolve(args.Data, t.DB)
*result = res
return err
}
// StartJsonRPC starts the json rpc server and registers the endpoints.
func (s *Server) StartJsonRPC() error {
server := new(JSONServer)
server.DB = s.DB
port := ":" + s.Args.JSONRPCPort
s1 := rpc.NewServer() // Create a new RPC server
s1.RegisterCodec(json.NewCodec(), "application/json") // Register the type of data requested as JSON
s1.RegisterService(server, "") // Register the service by creating a new JSON server
r := mux.NewRouter()
r.Handle("/rpc", s1)
log.Fatal(http.ListenAndServe(port, r))
return nil
}

View file

@ -4,6 +4,7 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"hash"
"io/ioutil"
@ -307,6 +308,14 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
}
}()
}
if !args.DisableStartJSONRPC {
go func() {
err := s.StartJsonRPC()
if err != nil {
log.Println("JSONRPC Server failed!", err)
}
}()
}
// Load peers from disk and subscribe to one if there are any
if !args.DisableLoadPeers {
go func() {
@ -446,14 +455,24 @@ func (s *Server) HeightHashSubscribe() error {
return nil
}
// Resolve is the gRPC endpoint for resolve.
func (s *Server) Resolve(ctx context.Context, args *pb.StringArray) (*pb.Outputs, error) {
return InternalResolve(args.Value, s.DB)
}
// InternalResolve takes an array of urls and resolves them to their transactions.
func InternalResolve(urls []string, DB *db.ReadOnlyDBColumnFamily) (*pb.Outputs, error) {
if DB == nil {
return nil, errors.New("db is nil")
// return nil, nil
}
metrics.RequestsCount.With(prometheus.Labels{"method": "resolve"}).Inc()
allTxos := make([]*pb.Output, 0)
allExtraTxos := make([]*pb.Output, 0)
for _, url := range args.Value {
res := s.DB.Resolve(url)
for _, url := range urls {
res := DB.Resolve(url)
txos, extraTxos, err := res.ToOutputs()
if err != nil {
return nil, err