From 076adcca39e478806b3b3d4f1e9a9f8ee058ff85 Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Thu, 30 Sep 2021 15:03:00 -0400 Subject: [PATCH 1/6] Initial caching --- server/search.go | 93 ++++++++++++++++++++++++++++++------------------ 1 file changed, 58 insertions(+), 35 deletions(-) diff --git a/server/search.go b/server/search.go index ad7eafb..983f775 100644 --- a/server/search.go +++ b/server/search.go @@ -179,43 +179,61 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, var pageSize = 10 var orderBy []orderField var searchIndices []string + var searchResult *elastic.SearchResult = nil client := s.EsClient searchIndices = make([]string, 0, 1) searchIndices = append(searchIndices, s.Args.EsIndex) - q := elastic.NewBoolQuery() + cacheHit := false + var cachedRecords []*record + /* + TODO: cache based on search request params + include from value and number of results. + When another search request comes in with same search params + and same or increased offset (which we currently don't even use?) + that will be a cache hit. + */ - err := s.checkQuery(in) - if err != nil { - return nil, err - } - q = s.setupEsQuery(q, in, &pageSize, &from, &orderBy) + if !cacheHit { + q := elastic.NewBoolQuery() - fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title") - search := client.Search(). - Index(searchIndices...). - FetchSourceContext(fsc). - Query(q). // specify the query - From(0).Size(DefaultSearchSize) + err := s.checkQuery(in) + if err != nil { + return nil, err + } + q = s.setupEsQuery(q, in, &pageSize, &from, &orderBy) - for _, x := range orderBy { - search = search.Sort(x.Field, x.IsAsc) + fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title") + search := client.Search(). + Index(searchIndices...). + FetchSourceContext(fsc). + Query(q). // specify the query + From(0).Size(DefaultSearchSize) + + for _, x := range orderBy { + search = search.Sort(x.Field, x.IsAsc) + } + + searchResult, err = search.Do(ctx) // execute + if err != nil && elastic.IsNotFound(err) { + log.Println("Index returned 404! Check writer. Index: ", searchIndices) + return &pb.Outputs{}, nil + + } else if err != nil { + s.recordErrorAndReturn(err, "search_errors") + log.Println("Error executing query: ", err) + return nil, err + } + + log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis) + + cachedRecords = make([]*record, 0, 0) + } else { + //TODO fill cached records here + cachedRecords = make([]*record, 0, 0) } - searchResult, err := search.Do(ctx) // execute - if err != nil && elastic.IsNotFound(err) { - log.Println("Index returned 404! Check writer. Index: ", searchIndices) - return &pb.Outputs{}, nil - - } else if err != nil { - s.recordErrorAndReturn(err, "search_errors") - log.Println("Error executing query: ", err) - return nil, err - } - - log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis) - - txos, extraTxos, blocked := s.postProcessResults(ctx, client, searchResult, in, pageSize, from, searchIndices) + txos, extraTxos, blocked := s.postProcessResults(ctx, client, searchResult, in, pageSize, from, searchIndices, cachedRecords) t1 := time.Now() @@ -272,20 +290,25 @@ func (s *Server) postProcessResults( in *pb.SearchRequest, pageSize int, from int, - searchIndices []string) ([]*pb.Output, []*pb.Output, []*pb.Blocked) { + searchIndices []string, + cachedRecords []*record) ([]*pb.Output, []*pb.Output, []*pb.Blocked) { var txos []*pb.Output var records []*record var blockedRecords []*record var blocked []*pb.Blocked var blockedMap map[string]*pb.Blocked - records = make([]*record, 0, searchResult.TotalHits()) + if len(cachedRecords) < 0 { + records = make([]*record, 0, searchResult.TotalHits()) - var r record - for _, item := range searchResult.Each(reflect.TypeOf(r)) { - if t, ok := item.(record); ok { - records = append(records, &t) + var r record + for _, item := range searchResult.Each(reflect.TypeOf(r)) { + if t, ok := item.(record); ok { + records = append(records, &t) + } } + } else { + records = cachedRecords } //printJsonFullResults(searchResult) @@ -617,7 +640,7 @@ func (s *Server) getUniqueChannels(records []*record, client *elastic.Client, ct return channelTxos, channels } -func (s * Server) getClaimsForReposts(ctx context.Context, client *elastic.Client, records []*record, searchIndices []string) ([]*pb.Output, []*record, map[string]*pb.Output) { +func (s *Server) getClaimsForReposts(ctx context.Context, client *elastic.Client, records []*record, searchIndices []string) ([]*pb.Output, []*record, map[string]*pb.Output) { var totalReposted = 0 var mget = client.Mget() //.StoredFields("_id") -- 2.45.3 From 36b4a3cdd9d15a4421d1ab6efc4032e414ec65b8 Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Fri, 1 Oct 2021 13:54:03 -0400 Subject: [PATCH 2/6] implement caching --- dev.sh | 2 +- go.mod | 3 +- go.sum | 12 ++++++ server/search.go | 97 ++++++++++++++++++++++++++++++++++-------------- server/server.go | 17 ++++++++- 5 files changed, 99 insertions(+), 32 deletions(-) diff --git a/dev.sh b/dev.sh index 78b6117..d96fca7 100755 --- a/dev.sh +++ b/dev.sh @@ -3,4 +3,4 @@ hash reflex 2>/dev/null || go get github.com/cespare/reflex hash reflex 2>/dev/null || { echo >&2 'Make sure '"$(go env GOPATH)"'/bin is in your $PATH'; exit 1; } -reflex --decoration=none --start-service=true -- sh -c "go run . serve --dev" +reflex --decoration=none --start-service=true -- sh -c "go run . serve --debug" diff --git a/go.mod b/go.mod index 7471b86..59d908d 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,9 @@ module github.com/lbryio/hub go 1.16 require ( + github.com/ReneKroon/ttlcache/v2 v2.8.1 github.com/akamensky/argparse v1.2.2 - github.com/btcsuite/btcutil v1.0.2 + github.com/btcsuite/btcutil v1.0.2 // indirect github.com/lbryio/lbry.go/v2 v2.7.2-0.20210625145058-2b155597bf57 github.com/olivere/elastic/v7 v7.0.24 github.com/prometheus/client_golang v1.11.0 diff --git a/go.sum b/go.sum index cce7b53..6c0e5c7 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/ReneKroon/ttlcache/v2 v2.8.1 h1:0Exdyt5+vEsdRoFO1T7qDIYM3gq/ETbeYV+vjgcPxZk= +github.com/ReneKroon/ttlcache/v2 v2.8.1/go.mod h1:mBxvsNY+BT8qLLd6CuAJubbKo6r0jh3nb5et22bbfGY= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/akamensky/argparse v1.2.2 h1:P17T0ZjlUNJuWTPPJ2A5dM1wxarHgHqfYH+AZTo2xQA= github.com/akamensky/argparse v1.2.2/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA= @@ -196,6 +198,8 @@ github.com/ybbus/jsonrpc v0.0.0-20180411222309-2a548b7d822d/go.mod h1:XJrh1eMSzd github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= +go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -207,6 +211,8 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 h1:2M3HP5CCK1Si9FQhwnzYhXdG6DXeebvUHFpre8QvbyI= golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -236,6 +242,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -271,8 +279,11 @@ golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3 golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.0.0-20210112230658-8b4aab62c064/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -310,6 +321,7 @@ google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/l google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/ini.v1 v1.48.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= diff --git a/server/search.go b/server/search.go index 983f775..274c762 100644 --- a/server/search.go +++ b/server/search.go @@ -2,8 +2,10 @@ package server import ( "context" + "encoding/hex" "encoding/json" "fmt" + "github.com/btcsuite/btcutil/base58" "log" "math" "reflect" @@ -18,6 +20,7 @@ import ( "github.com/olivere/elastic/v7" "golang.org/x/text/cases" "golang.org/x/text/language" + "google.golang.org/protobuf/encoding/protojson" "gopkg.in/karalabe/cookiejar.v1/collections/deque" ) @@ -182,19 +185,36 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, var searchResult *elastic.SearchResult = nil client := s.EsClient searchIndices = make([]string, 0, 1) - searchIndices = append(searchIndices, s.Args.EsIndex) + //searchIndices = append(searchIndices, s.Args.EsIndex) + + indices, _ := client.IndexNames() + for _, index := range indices { + if index != "claims" { + log.Println(index) + searchIndices = append(searchIndices, index) + } + } + + //cacheHit := false + var records []*record + + cacheKey := s.serializeSearchRequest(in) - cacheHit := false - var cachedRecords []*record /* - TODO: cache based on search request params + cache based on search request params include from value and number of results. When another search request comes in with same search params and same or increased offset (which we currently don't even use?) that will be a cache hit. + FIXME: For now the cache is turned off when in debugging mode + (for unit tests) because it breaks on some of them. + FIXME: Currently the cache just skips the initial search, + the mgets and post processing are still done. There's probably + a more efficient way to store the final result. */ - if !cacheHit { + if val, err := s.QueryCache.Get(cacheKey); err != nil || s.Args.Debug { + q := elastic.NewBoolQuery() err := s.checkQuery(in) @@ -227,13 +247,17 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis) - cachedRecords = make([]*record, 0, 0) + records = s.searchResultToRecords(searchResult) + err = s.QueryCache.Set(cacheKey, records) + if err != nil { + //FIXME: Should this be fatal? + log.Println("Error storing records in cache: ", err) + } } else { - //TODO fill cached records here - cachedRecords = make([]*record, 0, 0) + records = val.([]*record) } - txos, extraTxos, blocked := s.postProcessResults(ctx, client, searchResult, in, pageSize, from, searchIndices, cachedRecords) + txos, extraTxos, blocked := s.postProcessResults(ctx, client, records, in, pageSize, from, searchIndices) t1 := time.Now() @@ -283,34 +307,33 @@ func (s *Server) cleanTags(tags []string) []string { return cleanedTags } +func (s *Server) searchResultToRecords( + searchResult *elastic.SearchResult) []*record { + records := make([]*record, 0, searchResult.TotalHits()) + + var r record + for _, item := range searchResult.Each(reflect.TypeOf(r)) { + if t, ok := item.(record); ok { + records = append(records, &t) + } + } + + return records +} + func (s *Server) postProcessResults( ctx context.Context, client *elastic.Client, - searchResult *elastic.SearchResult, + records []*record, in *pb.SearchRequest, pageSize int, from int, - searchIndices []string, - cachedRecords []*record) ([]*pb.Output, []*pb.Output, []*pb.Blocked) { + searchIndices []string) ([]*pb.Output, []*pb.Output, []*pb.Blocked) { var txos []*pb.Output - var records []*record var blockedRecords []*record var blocked []*pb.Blocked var blockedMap map[string]*pb.Blocked - if len(cachedRecords) < 0 { - records = make([]*record, 0, searchResult.TotalHits()) - - var r record - for _, item := range searchResult.Each(reflect.TypeOf(r)) { - if t, ok := item.(record); ok { - records = append(records, &t) - } - } - } else { - records = cachedRecords - } - //printJsonFullResults(searchResult) records, blockedRecords, blockedMap = removeBlocked(records) @@ -508,7 +531,9 @@ func (s *Server) setupEsQuery( } if in.PublicKeyId != "" { - q = q.Must(elastic.NewTermQuery("public_key_id.keyword", in.PublicKeyId)) + value := hex.EncodeToString(base58.Decode(in.PublicKeyId)[1:21]) + q = q.Must(elastic.NewTermQuery("public_key_id.keyword", value)) + // q = q.Must(elastic.NewTermQuery("public_key_id.keyword", in.PublicKeyId)) } if in.HasChannelSignature { @@ -544,7 +569,8 @@ func (s *Server) setupEsQuery( q = AddTermField(q, in.ShortUrl, "short_url.keyword") q = AddTermField(q, in.Signature, "signature.keyword") q = AddTermField(q, in.TxId, "tx_id.keyword") - q = AddTermField(q, strings.ToUpper(in.FeeCurrency), "fee_currency.keyword") + // q = AddTermField(q, strings.ToUpper(in.FeeCurrency), "fee_currency.keyword") + q = AddTermField(q, in.FeeCurrency, "fee_currency.keyword") q = AddTermField(q, in.RepostedClaimId, "reposted_claim_id.keyword") q = AddTermsField(q, s.cleanTags(in.AnyTags), "tags.keyword") @@ -692,6 +718,20 @@ func (s *Server) getClaimsForReposts(ctx context.Context, client *elastic.Client return claims, repostedRecords, respostedMap } +/* + Takes a search request and serializes into a string for use as a key into the + internal cache for the hub. +*/ +func (s *Server) serializeSearchRequest(request *pb.SearchRequest) string { + bytes, err := protojson.Marshal(request) + if err != nil { + return "" + } + str := string((*s.S256).Sum(bytes)) + log.Println(str) + return str +} + func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*record { finalHits := make([]*record, 0, len(searchHits)) var channelCounters map[string]int @@ -826,3 +866,4 @@ func removeBlocked(searchHits []*record) ([]*record, []*record, map[string]*pb.B return newHits, blockedHits, blockedChannels } + diff --git a/server/server.go b/server/server.go index dde6d4e..d6e68f0 100644 --- a/server/server.go +++ b/server/server.go @@ -2,14 +2,16 @@ package server import ( "context" + "crypto/sha256" "fmt" + "hash" "log" + "net/http" "os" "regexp" - - "net/http" "time" + "github.com/ReneKroon/ttlcache/v2" "github.com/lbryio/hub/meta" pb "github.com/lbryio/hub/protobuf/go" "github.com/olivere/elastic/v7" @@ -24,6 +26,8 @@ type Server struct { WeirdCharsRe *regexp.Regexp EsClient *elastic.Client Servers []*FederatedServer + QueryCache *ttlcache.Cache + S256 *hash.Hash pb.UnimplementedHubServer } @@ -123,12 +127,21 @@ func MakeHubServer(args *Args) *Server { if err != nil { log.Fatal(err) } + + cache := ttlcache.NewCache() + err = cache.SetTTL(5 * time.Minute) + if err != nil { + log.Fatal(err) + } + s256 := sha256.New() s := &Server{ GrpcServer: grpcServer, Args: args, MultiSpaceRe: multiSpaceRe, WeirdCharsRe: weirdCharsRe, EsClient: client, + QueryCache: cache, + S256: &s256, } return s -- 2.45.3 From 0d59480f3c2d67974ef98965fd2e9706481bea88 Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Sat, 2 Oct 2021 22:49:49 -0400 Subject: [PATCH 3/6] bug fixes and cache purging --- go.mod | 2 +- protobuf/definitions/hub.proto | 4 +-- protobuf/go/hub.pb.go | 12 ++++---- server/search.go | 56 +++++++++++++++++++++++++--------- server/server.go | 41 ++++++++++++++++--------- 5 files changed, 76 insertions(+), 39 deletions(-) diff --git a/go.mod b/go.mod index 59d908d..c58e386 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.16 require ( github.com/ReneKroon/ttlcache/v2 v2.8.1 github.com/akamensky/argparse v1.2.2 - github.com/btcsuite/btcutil v1.0.2 // indirect + github.com/btcsuite/btcutil v1.0.2 github.com/lbryio/lbry.go/v2 v2.7.2-0.20210625145058-2b155597bf57 github.com/olivere/elastic/v7 v7.0.24 github.com/prometheus/client_golang v1.11.0 diff --git a/protobuf/definitions/hub.proto b/protobuf/definitions/hub.proto index 5dca601..13d2213 100644 --- a/protobuf/definitions/hub.proto +++ b/protobuf/definitions/hub.proto @@ -48,7 +48,7 @@ message SearchRequest { InvertibleField claim_id = 1; InvertibleField channel_id = 2; string text = 3; - uint32 limit = 4; + int32 limit = 4; repeated string order_by = 5; uint32 offset = 6; bool is_controlling = 7; @@ -95,7 +95,7 @@ message SearchRequest { repeated string not_tags = 51; bool has_channel_signature = 52; BoolValue has_source = 53; - uint32 limit_claims_per_channel = 54; + int32 limit_claims_per_channel = 54; repeated string any_languages = 55; repeated string all_languages = 56; bool remove_duplicates = 57; diff --git a/protobuf/go/hub.pb.go b/protobuf/go/hub.pb.go index b6ee72d..46c53fb 100644 --- a/protobuf/go/hub.pb.go +++ b/protobuf/go/hub.pb.go @@ -372,7 +372,7 @@ type SearchRequest struct { ClaimId *InvertibleField `protobuf:"bytes,1,opt,name=claim_id,json=claimId,proto3" json:"claim_id"` ChannelId *InvertibleField `protobuf:"bytes,2,opt,name=channel_id,json=channelId,proto3" json:"channel_id"` Text string `protobuf:"bytes,3,opt,name=text,proto3" json:"text"` - Limit uint32 `protobuf:"varint,4,opt,name=limit,proto3" json:"limit"` + Limit int32 `protobuf:"varint,4,opt,name=limit,proto3" json:"limit"` OrderBy []string `protobuf:"bytes,5,rep,name=order_by,json=orderBy,proto3" json:"order_by"` Offset uint32 `protobuf:"varint,6,opt,name=offset,proto3" json:"offset"` IsControlling bool `protobuf:"varint,7,opt,name=is_controlling,json=isControlling,proto3" json:"is_controlling"` @@ -419,7 +419,7 @@ type SearchRequest struct { NotTags []string `protobuf:"bytes,51,rep,name=not_tags,json=notTags,proto3" json:"not_tags"` HasChannelSignature bool `protobuf:"varint,52,opt,name=has_channel_signature,json=hasChannelSignature,proto3" json:"has_channel_signature"` HasSource *BoolValue `protobuf:"bytes,53,opt,name=has_source,json=hasSource,proto3" json:"has_source"` - LimitClaimsPerChannel uint32 `protobuf:"varint,54,opt,name=limit_claims_per_channel,json=limitClaimsPerChannel,proto3" json:"limit_claims_per_channel"` + LimitClaimsPerChannel int32 `protobuf:"varint,54,opt,name=limit_claims_per_channel,json=limitClaimsPerChannel,proto3" json:"limit_claims_per_channel"` AnyLanguages []string `protobuf:"bytes,55,rep,name=any_languages,json=anyLanguages,proto3" json:"any_languages"` AllLanguages []string `protobuf:"bytes,56,rep,name=all_languages,json=allLanguages,proto3" json:"all_languages"` RemoveDuplicates bool `protobuf:"varint,57,opt,name=remove_duplicates,json=removeDuplicates,proto3" json:"remove_duplicates"` @@ -479,7 +479,7 @@ func (x *SearchRequest) GetText() string { return "" } -func (x *SearchRequest) GetLimit() uint32 { +func (x *SearchRequest) GetLimit() int32 { if x != nil { return x.Limit } @@ -808,7 +808,7 @@ func (x *SearchRequest) GetHasSource() *BoolValue { return nil } -func (x *SearchRequest) GetLimitClaimsPerChannel() uint32 { +func (x *SearchRequest) GetLimitClaimsPerChannel() int32 { if x != nil { return x.LimitClaimsPerChannel } @@ -876,7 +876,7 @@ var file_hub_proto_rawDesc = []byte{ 0x76, 0x65, 0x72, 0x74, 0x69, 0x62, 0x6c, 0x65, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x09, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x78, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x65, 0x78, 0x74, 0x12, 0x14, 0x0a, 0x05, - 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x6c, 0x69, 0x6d, + 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x5f, 0x62, 0x79, 0x18, 0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x42, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x6f, @@ -998,7 +998,7 @@ var file_hub_proto_rawDesc = []byte{ 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x6f, 0x6f, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x09, 0x68, 0x61, 0x73, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x37, 0x0a, 0x18, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x5f, 0x63, 0x6c, 0x61, 0x69, 0x6d, 0x73, 0x5f, 0x70, 0x65, 0x72, 0x5f, 0x63, 0x68, 0x61, - 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x36, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x15, 0x6c, 0x69, 0x6d, 0x69, + 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x36, 0x20, 0x01, 0x28, 0x05, 0x52, 0x15, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x43, 0x6c, 0x61, 0x69, 0x6d, 0x73, 0x50, 0x65, 0x72, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x23, 0x0a, 0x0d, 0x61, 0x6e, 0x79, 0x5f, 0x6c, 0x61, 0x6e, 0x67, 0x75, 0x61, 0x67, 0x65, 0x73, 0x18, 0x37, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x61, 0x6e, 0x79, 0x4c, 0x61, 0x6e, diff --git a/server/search.go b/server/search.go index 274c762..442720b 100644 --- a/server/search.go +++ b/server/search.go @@ -187,6 +187,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, searchIndices = make([]string, 0, 1) //searchIndices = append(searchIndices, s.Args.EsIndex) + //Code for debugging locally indices, _ := client.IndexNames() for _, index := range indices { if index != "claims" { @@ -195,11 +196,26 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, } } - //cacheHit := false + // If it's been more than RefreshDelta time since we last checked if the + // es index has been refreshed, we check (this is 2 seconds in prod, + // 0 seconds in debug / unit testing). If the index has been refreshed + // a different number of times since we last checked, we purge the cache + if time.Now().After(s.LastRefreshCheck.Add(s.RefreshDelta)) { + // FIXME: Should this be on all indices + res, _ := client.IndexStats(searchIndices[0]).Do(ctx) + numRefreshes := res.Indices[searchIndices[0]].Primaries.Refresh.Total + if numRefreshes != s.NumESRefreshes { + _ = s.QueryCache.Purge() + s.NumESRefreshes = numRefreshes + } + } + var records []*record cacheKey := s.serializeSearchRequest(in) + setPageVars(in, &pageSize, &from) + /* cache based on search request params include from value and number of results. @@ -213,7 +229,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, a more efficient way to store the final result. */ - if val, err := s.QueryCache.Get(cacheKey); err != nil || s.Args.Debug { + if val, err := s.QueryCache.Get(cacheKey); err != nil { q := elastic.NewBoolQuery() @@ -221,7 +237,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, if err != nil { return nil, err } - q = s.setupEsQuery(q, in, &pageSize, &from, &orderBy) + q = s.setupEsQuery(q, in, &orderBy) fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title") search := client.Search(). @@ -399,17 +415,26 @@ func (s *Server) checkQuery(in *pb.SearchRequest) error { for name, failed := range checks { if failed { time.Sleep(2) // throttle - return fmt.Errorf("%s cant have more than %d items", name, limit) + return fmt.Errorf("%s cant have more than %d items.", name, limit) } } return nil } +func setPageVars(in *pb.SearchRequest, pageSize *int, from *int) { + if in.Limit > 0 { + log.Printf("############ limit: %d\n", in.Limit) + *pageSize = int(in.Limit) + } + + if in.Offset > 0 { + *from = int(in.Offset) + } +} + func (s *Server) setupEsQuery( q *elastic.BoolQuery, in *pb.SearchRequest, - pageSize *int, - from *int, orderBy *[]orderField) *elastic.BoolQuery { claimTypes := map[string]int{ "stream": 1, @@ -464,14 +489,6 @@ func (s *Server) setupEsQuery( q = q.Must(elastic.NewTermQuery("is_controlling", in.IsControlling)) } - if in.Limit > 0 { - *pageSize = int(in.Limit) - } - - if in.Offset > 0 { - *from = int(in.Offset) - } - if len(in.ClaimName) > 0 { in.NormalizedName = util.NormalizeName(in.ClaimName) } @@ -723,12 +740,21 @@ func (s *Server) getClaimsForReposts(ctx context.Context, client *elastic.Client internal cache for the hub. */ func (s *Server) serializeSearchRequest(request *pb.SearchRequest) string { + // Save the offest / limit and set to zero, cache hits should happen regardless + // and they're used in post processing + //offset, limit := request.Offset, request.Limit + //request.Offset = 0 + //request.Limit = 0 + bytes, err := protojson.Marshal(request) if err != nil { return "" } str := string((*s.S256).Sum(bytes)) - log.Println(str) + // log.Println(str) + //request.Offset = offset + //request.Limit = limit + return str } diff --git a/server/server.go b/server/server.go index d6e68f0..bb34112 100644 --- a/server/server.go +++ b/server/server.go @@ -20,14 +20,17 @@ import ( ) type Server struct { - GrpcServer *grpc.Server - Args *Args - MultiSpaceRe *regexp.Regexp - WeirdCharsRe *regexp.Regexp - EsClient *elastic.Client - Servers []*FederatedServer - QueryCache *ttlcache.Cache - S256 *hash.Hash + GrpcServer *grpc.Server + Args *Args + MultiSpaceRe *regexp.Regexp + WeirdCharsRe *regexp.Regexp + EsClient *elastic.Client + Servers []*FederatedServer + QueryCache *ttlcache.Cache + S256 *hash.Hash + LastRefreshCheck time.Time + RefreshDelta time.Duration + NumESRefreshes int64 pb.UnimplementedHubServer } @@ -134,14 +137,22 @@ func MakeHubServer(args *Args) *Server { log.Fatal(err) } s256 := sha256.New() + var refreshDelta = time.Second * 2 + if args.Debug { + refreshDelta = time.Second * 0 + } + s := &Server{ - GrpcServer: grpcServer, - Args: args, - MultiSpaceRe: multiSpaceRe, - WeirdCharsRe: weirdCharsRe, - EsClient: client, - QueryCache: cache, - S256: &s256, + GrpcServer: grpcServer, + Args: args, + MultiSpaceRe: multiSpaceRe, + WeirdCharsRe: weirdCharsRe, + EsClient: client, + QueryCache: cache, + S256: &s256, + LastRefreshCheck: time.Now(), + RefreshDelta: refreshDelta, + NumESRefreshes: 0, } return s -- 2.45.3 From a0de5164ba3d1c50d49c80dc1810b9c0592b94aa Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Sat, 2 Oct 2021 23:13:47 -0400 Subject: [PATCH 4/6] cleanup --- server/search.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/server/search.go b/server/search.go index 442720b..0ffaf45 100644 --- a/server/search.go +++ b/server/search.go @@ -5,7 +5,6 @@ import ( "encoding/hex" "encoding/json" "fmt" - "github.com/btcsuite/btcutil/base58" "log" "math" "reflect" @@ -15,6 +14,7 @@ import ( //"github.com/lbryio/hub/schema" + "github.com/btcsuite/btcutil/base58" pb "github.com/lbryio/hub/protobuf/go" "github.com/lbryio/lbry.go/v2/extras/util" "github.com/olivere/elastic/v7" @@ -185,16 +185,16 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, var searchResult *elastic.SearchResult = nil client := s.EsClient searchIndices = make([]string, 0, 1) - //searchIndices = append(searchIndices, s.Args.EsIndex) + searchIndices = append(searchIndices, s.Args.EsIndex) //Code for debugging locally - indices, _ := client.IndexNames() - for _, index := range indices { - if index != "claims" { - log.Println(index) - searchIndices = append(searchIndices, index) - } - } + //indices, _ := client.IndexNames() + //for _, index := range indices { + // if index != "claims" { + // log.Println(index) + // searchIndices = append(searchIndices, index) + // } + //} // If it's been more than RefreshDelta time since we last checked if the // es index has been refreshed, we check (this is 2 seconds in prod, -- 2.45.3 From 220a42984e8337e1b6aba155b47742465d6546ac Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Mon, 4 Oct 2021 12:44:55 -0400 Subject: [PATCH 5/6] put jack's changes back --- server/search.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/server/search.go b/server/search.go index 0ffaf45..1093cae 100644 --- a/server/search.go +++ b/server/search.go @@ -2,7 +2,6 @@ package server import ( "context" - "encoding/hex" "encoding/json" "fmt" "log" @@ -14,7 +13,6 @@ import ( //"github.com/lbryio/hub/schema" - "github.com/btcsuite/btcutil/base58" pb "github.com/lbryio/hub/protobuf/go" "github.com/lbryio/lbry.go/v2/extras/util" "github.com/olivere/elastic/v7" @@ -548,9 +546,7 @@ func (s *Server) setupEsQuery( } if in.PublicKeyId != "" { - value := hex.EncodeToString(base58.Decode(in.PublicKeyId)[1:21]) - q = q.Must(elastic.NewTermQuery("public_key_id.keyword", value)) - // q = q.Must(elastic.NewTermQuery("public_key_id.keyword", in.PublicKeyId)) + q = q.Must(elastic.NewTermQuery("public_key_id.keyword", in.PublicKeyId)) } if in.HasChannelSignature { @@ -586,8 +582,7 @@ func (s *Server) setupEsQuery( q = AddTermField(q, in.ShortUrl, "short_url.keyword") q = AddTermField(q, in.Signature, "signature.keyword") q = AddTermField(q, in.TxId, "tx_id.keyword") - // q = AddTermField(q, strings.ToUpper(in.FeeCurrency), "fee_currency.keyword") - q = AddTermField(q, in.FeeCurrency, "fee_currency.keyword") + q = AddTermField(q, strings.ToUpper(in.FeeCurrency), "fee_currency.keyword") q = AddTermField(q, in.RepostedClaimId, "reposted_claim_id.keyword") q = AddTermsField(q, s.cleanTags(in.AnyTags), "tags.keyword") -- 2.45.3 From a6d47e662a64408878a2b09c110a3fb0ec0206c6 Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Mon, 4 Oct 2021 17:58:27 -0400 Subject: [PATCH 6/6] Don't swallow error, cachettl and refresh delta as params --- main.go | 30 ++++++++++++++++++------------ server/search.go | 6 ++++-- server/server.go | 20 +++++++++++--------- 3 files changed, 33 insertions(+), 23 deletions(-) diff --git a/main.go b/main.go index 6aa205e..e7daab8 100644 --- a/main.go +++ b/main.go @@ -18,11 +18,13 @@ import ( ) const ( - defaultHost = "0.0.0.0" - defaultPort = "50051" - defaultEsHost = "http://localhost" - defaultEsIndex = "claims" - defaultEsPort = "9200" + defaultHost = "0.0.0.0" + defaultPort = "50051" + defaultEsHost = "http://localhost" + defaultEsIndex = "claims" + defaultEsPort = "9200" + defaultRefreshDelta = 5 + defaultCacheTTL = 5 ) func GetEnvironment(data []string, getkeyval func(item string) (key, val string)) map[string]string { @@ -71,6 +73,8 @@ func parseArgs(searchRequest *pb.SearchRequest) *server.Args { esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: defaultEsHost}) esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: defaultEsPort}) esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: defaultEsIndex}) + refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: defaultRefreshDelta}) + cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: defaultCacheTTL}) text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"}) name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"}) @@ -89,13 +93,15 @@ func parseArgs(searchRequest *pb.SearchRequest) *server.Args { } args := &server.Args{ - CmdType: server.SearchCmd, - Host: *host, - Port: ":" + *port, - EsHost: *esHost, - EsPort: *esPort, - EsIndex: *esIndex, - Debug: *debug, + CmdType: server.SearchCmd, + Host: *host, + Port: ":" + *port, + EsHost: *esHost, + EsPort: *esPort, + EsIndex: *esIndex, + Debug: *debug, + RefreshDelta: *refreshDelta, + CacheTTL: *cacheTTL, } if esHost, ok := environment["ELASTIC_HOST"]; ok { diff --git a/server/search.go b/server/search.go index 1093cae..69e4620 100644 --- a/server/search.go +++ b/server/search.go @@ -199,8 +199,10 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, // 0 seconds in debug / unit testing). If the index has been refreshed // a different number of times since we last checked, we purge the cache if time.Now().After(s.LastRefreshCheck.Add(s.RefreshDelta)) { - // FIXME: Should this be on all indices - res, _ := client.IndexStats(searchIndices[0]).Do(ctx) + res, err := client.IndexStats(searchIndices[0]).Do(ctx) + if err != nil { + log.Printf("Error on ES index stats\n%v\n", err) + } numRefreshes := res.Indices[searchIndices[0]].Primaries.Refresh.Total if numRefreshes != s.NumESRefreshes { _ = s.QueryCache.Purge() diff --git a/server/server.go b/server/server.go index bb34112..66a88b6 100644 --- a/server/server.go +++ b/server/server.go @@ -48,13 +48,15 @@ const ( type Args struct { // TODO Make command types an enum - CmdType int - Host string - Port string - EsHost string - EsPort string - EsIndex string - Debug bool + CmdType int + Host string + Port string + EsHost string + EsPort string + EsIndex string + Debug bool + RefreshDelta int + CacheTTL int } func getVersion() string { @@ -132,12 +134,12 @@ func MakeHubServer(args *Args) *Server { } cache := ttlcache.NewCache() - err = cache.SetTTL(5 * time.Minute) + err = cache.SetTTL(time.Duration(args.CacheTTL) * time.Minute) if err != nil { log.Fatal(err) } s256 := sha256.New() - var refreshDelta = time.Second * 2 + var refreshDelta = time.Second * time.Duration(args.RefreshDelta) if args.Debug { refreshDelta = time.Second * 0 } -- 2.45.3