From 9403d84a8352dd240caf2f2c788db10f52529185 Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Wed, 7 Sep 2022 21:36:07 +0300 Subject: [PATCH] WIP: Resolve json rpc (#57) * jsonrpc * update readme for open file limits * add CGO flags to readme * remove uneeded logging * don't start jsonrpc server in unit tests * cleanup and add args for json rpc * correct rpc default port * remove unused test_rpc.sh script Co-authored-by: Ubuntu --- .gitignore | 1 + db/db_resolve.go | 4 ++-- go.mod | 2 ++ go.sum | 8 +++---- main.go | 8 +++++-- readme.md | 15 ++++++++++++ requirements.txt | 16 +++++++++++++ server/args.go | 8 +++++++ server/federation_test.go | 2 ++ server/jsonrpc_service.go | 50 +++++++++++++++++++++++++++++++++++++++ server/server.go | 23 ++++++++++++++++-- 11 files changed, 127 insertions(+), 10 deletions(-) create mode 100644 requirements.txt create mode 100644 server/jsonrpc_service.go diff --git a/.gitignore b/.gitignore index 9f1c927..90c3cea 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .idea/ .vscode/ .DS_Store +.venv diff --git a/db/db_resolve.go b/db/db_resolve.go index 9020791..a2f34e0 100644 --- a/db/db_resolve.go +++ b/db/db_resolve.go @@ -175,8 +175,8 @@ func (db *ReadOnlyDBColumnFamily) ResolveParsedUrl(parsed *PathSegment) (*Resolv for kv := range ch { key := kv.Key.(*prefixes.ClaimTakeoverKey) val := kv.Value.(*prefixes.ClaimTakeoverValue) - log.Warnf("ClaimTakeoverKey: %#v", key) - log.Warnf("ClaimTakeoverValue: %#v", val) + log.Tracef("ClaimTakeoverKey: %#v", key) + log.Tracef("ClaimTakeoverValue: %#v", val) } controlling, err := db.GetControllingClaim(normalizedName) log.Warnf("controlling: %#v", controlling) diff --git a/go.mod b/go.mod index 2db271e..5e6cfcf 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,8 @@ require ( github.com/ReneKroon/ttlcache/v2 v2.8.1 github.com/akamensky/argparse v1.2.2 github.com/go-restruct/restruct v1.2.0-alpha + github.com/gorilla/mux v1.7.3 + github.com/gorilla/rpc v1.2.0 github.com/lbryio/lbry.go/v3 v3.0.1-beta github.com/linxGnu/grocksdb v1.6.42 github.com/olivere/elastic/v7 v7.0.24 diff --git a/go.sum b/go.sum index d52f4a5..c6d18ed 100644 --- a/go.sum +++ b/go.sum @@ -235,8 +235,8 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= @@ -252,7 +252,9 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORR github.com/gopherjs/gopherjs v0.0.0-20190915194858-d3ddacdb130f/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/rpc v1.2.0 h1:WvvdC2lNeT1SP32zrIce5l0ECBfbAlmrmSBsuc57wfk= github.com/gorilla/rpc v1.2.0/go.mod h1:V4h9r+4sF5HnzqbwIez0fKSpANP0zlYd3qR7p36jkTQ= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.2.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= @@ -764,7 +766,6 @@ golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210909193231-528a39cd75f3/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211123173158-ef496fb156ab h1:rfJ1bsoJQQIAoAxTxB7bme+vHrNkRw8CqfsYh9w54cw= golang.org/x/sys v0.0.0-20211123173158-ef496fb156ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -817,12 +818,11 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210112230658-8b4aab62c064/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.8-0.20211029000441-d6a9af8af023 h1:0c3L82FDQ5rt1bjTBlchS8t6RQ6299/+5bWMnRLh+uI= golang.org/x/tools v0.1.8-0.20211029000441-d6a9af8af023/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= +golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= diff --git a/main.go b/main.go index 0c47cbd..8dc1053 100644 --- a/main.go +++ b/main.go @@ -39,9 +39,13 @@ func main() { defer func() { log.Println("Shutting down server...") - s.EsClient.Stop() + if !s.Args.DisableEs { + s.EsClient.Stop() + } s.GrpcServer.GracefulStop() - s.DB.Shutdown() + if !s.Args.DisableResolve { + s.DB.Shutdown() + } log.Println("Returning from main...") }() diff --git a/readme.md b/readme.md index 1612ff5..3c9dde5 100644 --- a/readme.md +++ b/readme.md @@ -72,6 +72,8 @@ tar xfzv rocksdb-6.29.5.tar.gz cd rocksdb-6.29.5 make static_lib sudo make install +export CGO_CFLAGS="-I/usr/local/lib" +export CGO_LDFLAGS="-L/usr/local/lib -lrocksdb -lstdc++ -lm -lz -lsnappy -llz4 -lzstd -lbz2" ``` ``` @@ -80,6 +82,19 @@ https://github.com/protocolbuffers/protobuf/releases/download/v3.17.1/protobuf-a If you can run `./protobuf/build.sh` without errors, you have `go` and `protoc` installed correctly. +On Linux you probably need to instead the open file limits + +``` +ulimit -n 1000000 +sysctl -w fs.file-max=1000000 +``` + +and `/etc/security/limits.conf` or `/etc/sysctl.conf` change: + +``` +fs.file-max = 1000000 +``` + Finally, run the block processor as described under Usage. ### Running from Source diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..a6bd3e6 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,16 @@ +certifi==2022.6.15 +cffi==1.15.1 +charset-normalizer==2.1.0 +cryptography==37.0.4 +github3.py==3.2.0 +grpcio==1.47.0 +grpcio-tools==1.47.0 +idna==3.3 +protobuf==3.20.1 +pycparser==2.21 +PyJWT==2.4.0 +python-dateutil==2.8.2 +requests==2.28.1 +six==1.16.0 +uritemplate==4.1.1 +urllib3==1.26.11 diff --git a/server/args.go b/server/args.go index 6620681..e69dcb0 100644 --- a/server/args.go +++ b/server/args.go @@ -25,6 +25,7 @@ type Args struct { EsPort string PrometheusPort string NotifierPort string + JSONRPCPort string EsIndex string RefreshDelta int CacheTTL int @@ -43,6 +44,7 @@ type Args struct { DisableResolve bool DisableBlockingAndFiltering bool DisableStartNotifier bool + DisableStartJSONRPC bool } const ( @@ -54,6 +56,7 @@ const ( DefaultEsPort = "9200" DefaultPrometheusPort = "2112" DefaultNotifierPort = "18080" + DefaultJSONRPCPort = "50001" DefaultRefreshDelta = 5 DefaultCacheTTL = 5 DefaultPeerFile = "peers.txt" @@ -67,6 +70,7 @@ const ( DefaultDisableResolve = false DefaultDisableBlockingAndFiltering = false DisableStartNotifier = false + DisableStartJSONRPC = false ) var ( @@ -112,6 +116,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort}) prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort}) notifierPort := parser.String("", "notifier-port", &argparse.Options{Required: false, Help: "notifier port", Default: DefaultNotifierPort}) + jsonRPCPort := parser.String("", "json-rpc-port", &argparse.Options{Required: false, Help: "JSON RPC port", Default: DefaultJSONRPCPort}) esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: DefaultEsIndex}) refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: DefaultRefreshDelta}) cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: DefaultCacheTTL}) @@ -131,6 +136,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { disableResolve := parser.Flag("", "disable-resolve", &argparse.Options{Required: false, Help: "Disable resolve endpoint (and rocksdb loading)", Default: DefaultDisableRockDBRefresh}) disableBlockingAndFiltering := parser.Flag("", "disable-blocking-and-filtering", &argparse.Options{Required: false, Help: "Disable blocking and filtering of channels and streams", Default: DefaultDisableBlockingAndFiltering}) disableStartNotifier := parser.Flag("", "disable-start-notifier", &argparse.Options{Required: false, Help: "Disable start notifier", Default: DisableStartNotifier}) + disableStartJSONRPC := parser.Flag("", "disable-start-jsonrpc", &argparse.Options{Required: false, Help: "Disable start jsonrpc endpoint", Default: DisableStartJSONRPC}) text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"}) name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"}) @@ -157,6 +163,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { EsPort: *esPort, PrometheusPort: *prometheusPort, NotifierPort: *notifierPort, + JSONRPCPort: *jsonRPCPort, EsIndex: *esIndex, RefreshDelta: *refreshDelta, CacheTTL: *cacheTTL, @@ -175,6 +182,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { DisableResolve: *disableResolve, DisableBlockingAndFiltering: *disableBlockingAndFiltering, DisableStartNotifier: *disableStartNotifier, + DisableStartJSONRPC: *disableStartJSONRPC, } if esHost, ok := environment["ELASTIC_HOST"]; ok { diff --git a/server/federation_test.go b/server/federation_test.go index df02d25..850500f 100644 --- a/server/federation_test.go +++ b/server/federation_test.go @@ -55,6 +55,7 @@ func makeDefaultArgs() *server.Args { EsPort: server.DefaultEsPort, PrometheusPort: server.DefaultPrometheusPort, NotifierPort: server.DefaultNotifierPort, + JSONRPCPort: server.DefaultJSONRPCPort, EsIndex: server.DefaultEsIndex, RefreshDelta: server.DefaultRefreshDelta, CacheTTL: server.DefaultCacheTTL, @@ -70,6 +71,7 @@ func makeDefaultArgs() *server.Args { DisableResolve: true, DisableBlockingAndFiltering: true, DisableStartNotifier: true, + DisableStartJSONRPC: true, } return args diff --git a/server/jsonrpc_service.go b/server/jsonrpc_service.go new file mode 100644 index 0000000..95f45e8 --- /dev/null +++ b/server/jsonrpc_service.go @@ -0,0 +1,50 @@ +package server + +import ( + "log" + "net/http" + + "github.com/gorilla/mux" + "github.com/gorilla/rpc" + "github.com/gorilla/rpc/json" + "github.com/lbryio/herald.go/db" + pb "github.com/lbryio/herald.go/protobuf/go" +) + +type JSONServer struct { + DB *db.ReadOnlyDBColumnFamily +} + +type ResolveData struct { + Data []string `json:"data"` +} + +type Result struct { + Data string `json:"data"` +} + +// Resolve is the json rpc endpoint for resolve +func (t *JSONServer) Resolve(r *http.Request, args *ResolveData, result **pb.Outputs) error { + log.Println("Resolve") + res, err := InternalResolve(args.Data, t.DB) + *result = res + return err +} + +// StartJsonRPC starts the json rpc server and registers the endpoints. +func (s *Server) StartJsonRPC() error { + server := new(JSONServer) + server.DB = s.DB + + port := ":" + s.Args.JSONRPCPort + + s1 := rpc.NewServer() // Create a new RPC server + s1.RegisterCodec(json.NewCodec(), "application/json") // Register the type of data requested as JSON + s1.RegisterService(server, "") // Register the service by creating a new JSON server + + r := mux.NewRouter() + r.Handle("/rpc", s1) + log.Fatal(http.ListenAndServe(port, r)) + + return nil +} diff --git a/server/server.go b/server/server.go index 7cea014..5aba88a 100644 --- a/server/server.go +++ b/server/server.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "encoding/hex" + "errors" "fmt" "hash" "io/ioutil" @@ -307,6 +308,14 @@ func MakeHubServer(ctx context.Context, args *Args) *Server { } }() } + if !args.DisableStartJSONRPC { + go func() { + err := s.StartJsonRPC() + if err != nil { + log.Println("JSONRPC Server failed!", err) + } + }() + } // Load peers from disk and subscribe to one if there are any if !args.DisableLoadPeers { go func() { @@ -446,14 +455,24 @@ func (s *Server) HeightHashSubscribe() error { return nil } +// Resolve is the gRPC endpoint for resolve. func (s *Server) Resolve(ctx context.Context, args *pb.StringArray) (*pb.Outputs, error) { + return InternalResolve(args.Value, s.DB) +} + +// InternalResolve takes an array of urls and resolves them to their transactions. +func InternalResolve(urls []string, DB *db.ReadOnlyDBColumnFamily) (*pb.Outputs, error) { + if DB == nil { + return nil, errors.New("db is nil") + // return nil, nil + } metrics.RequestsCount.With(prometheus.Labels{"method": "resolve"}).Inc() allTxos := make([]*pb.Output, 0) allExtraTxos := make([]*pb.Output, 0) - for _, url := range args.Value { - res := s.DB.Resolve(url) + for _, url := range urls { + res := DB.Resolve(url) txos, extraTxos, err := res.ToOutputs() if err != nil { return nil, err