diff --git a/dev.sh b/dev.sh index 78b6117..d96fca7 100755 --- a/dev.sh +++ b/dev.sh @@ -3,4 +3,4 @@ hash reflex 2>/dev/null || go get github.com/cespare/reflex hash reflex 2>/dev/null || { echo >&2 'Make sure '"$(go env GOPATH)"'/bin is in your $PATH'; exit 1; } -reflex --decoration=none --start-service=true -- sh -c "go run . serve --dev" +reflex --decoration=none --start-service=true -- sh -c "go run . serve --debug" diff --git a/go.mod b/go.mod index 7471b86..c58e386 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/lbryio/hub go 1.16 require ( + github.com/ReneKroon/ttlcache/v2 v2.8.1 github.com/akamensky/argparse v1.2.2 github.com/btcsuite/btcutil v1.0.2 github.com/lbryio/lbry.go/v2 v2.7.2-0.20210625145058-2b155597bf57 diff --git a/go.sum b/go.sum index cce7b53..6c0e5c7 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/ReneKroon/ttlcache/v2 v2.8.1 h1:0Exdyt5+vEsdRoFO1T7qDIYM3gq/ETbeYV+vjgcPxZk= +github.com/ReneKroon/ttlcache/v2 v2.8.1/go.mod h1:mBxvsNY+BT8qLLd6CuAJubbKo6r0jh3nb5et22bbfGY= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/akamensky/argparse v1.2.2 h1:P17T0ZjlUNJuWTPPJ2A5dM1wxarHgHqfYH+AZTo2xQA= github.com/akamensky/argparse v1.2.2/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA= @@ -196,6 +198,8 @@ github.com/ybbus/jsonrpc v0.0.0-20180411222309-2a548b7d822d/go.mod h1:XJrh1eMSzd github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= +go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -207,6 +211,8 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 h1:2M3HP5CCK1Si9FQhwnzYhXdG6DXeebvUHFpre8QvbyI= golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -236,6 +242,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -271,8 +279,11 @@ golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3 golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.0.0-20210112230658-8b4aab62c064/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -310,6 +321,7 @@ google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/l google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/ini.v1 v1.48.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= diff --git a/main.go b/main.go index 6aa205e..e7daab8 100644 --- a/main.go +++ b/main.go @@ -18,11 +18,13 @@ import ( ) const ( - defaultHost = "0.0.0.0" - defaultPort = "50051" - defaultEsHost = "http://localhost" - defaultEsIndex = "claims" - defaultEsPort = "9200" + defaultHost = "0.0.0.0" + defaultPort = "50051" + defaultEsHost = "http://localhost" + defaultEsIndex = "claims" + defaultEsPort = "9200" + defaultRefreshDelta = 5 + defaultCacheTTL = 5 ) func GetEnvironment(data []string, getkeyval func(item string) (key, val string)) map[string]string { @@ -71,6 +73,8 @@ func parseArgs(searchRequest *pb.SearchRequest) *server.Args { esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: defaultEsHost}) esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: defaultEsPort}) esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: defaultEsIndex}) + refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: defaultRefreshDelta}) + cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: defaultCacheTTL}) text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"}) name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"}) @@ -89,13 +93,15 @@ func parseArgs(searchRequest *pb.SearchRequest) *server.Args { } args := &server.Args{ - CmdType: server.SearchCmd, - Host: *host, - Port: ":" + *port, - EsHost: *esHost, - EsPort: *esPort, - EsIndex: *esIndex, - Debug: *debug, + CmdType: server.SearchCmd, + Host: *host, + Port: ":" + *port, + EsHost: *esHost, + EsPort: *esPort, + EsIndex: *esIndex, + Debug: *debug, + RefreshDelta: *refreshDelta, + CacheTTL: *cacheTTL, } if esHost, ok := environment["ELASTIC_HOST"]; ok { diff --git a/protobuf/definitions/hub.proto b/protobuf/definitions/hub.proto index 5dca601..13d2213 100644 --- a/protobuf/definitions/hub.proto +++ b/protobuf/definitions/hub.proto @@ -48,7 +48,7 @@ message SearchRequest { InvertibleField claim_id = 1; InvertibleField channel_id = 2; string text = 3; - uint32 limit = 4; + int32 limit = 4; repeated string order_by = 5; uint32 offset = 6; bool is_controlling = 7; @@ -95,7 +95,7 @@ message SearchRequest { repeated string not_tags = 51; bool has_channel_signature = 52; BoolValue has_source = 53; - uint32 limit_claims_per_channel = 54; + int32 limit_claims_per_channel = 54; repeated string any_languages = 55; repeated string all_languages = 56; bool remove_duplicates = 57; diff --git a/protobuf/go/hub.pb.go b/protobuf/go/hub.pb.go index b6ee72d..46c53fb 100644 --- a/protobuf/go/hub.pb.go +++ b/protobuf/go/hub.pb.go @@ -372,7 +372,7 @@ type SearchRequest struct { ClaimId *InvertibleField `protobuf:"bytes,1,opt,name=claim_id,json=claimId,proto3" json:"claim_id"` ChannelId *InvertibleField `protobuf:"bytes,2,opt,name=channel_id,json=channelId,proto3" json:"channel_id"` Text string `protobuf:"bytes,3,opt,name=text,proto3" json:"text"` - Limit uint32 `protobuf:"varint,4,opt,name=limit,proto3" json:"limit"` + Limit int32 `protobuf:"varint,4,opt,name=limit,proto3" json:"limit"` OrderBy []string `protobuf:"bytes,5,rep,name=order_by,json=orderBy,proto3" json:"order_by"` Offset uint32 `protobuf:"varint,6,opt,name=offset,proto3" json:"offset"` IsControlling bool `protobuf:"varint,7,opt,name=is_controlling,json=isControlling,proto3" json:"is_controlling"` @@ -419,7 +419,7 @@ type SearchRequest struct { NotTags []string `protobuf:"bytes,51,rep,name=not_tags,json=notTags,proto3" json:"not_tags"` HasChannelSignature bool `protobuf:"varint,52,opt,name=has_channel_signature,json=hasChannelSignature,proto3" json:"has_channel_signature"` HasSource *BoolValue `protobuf:"bytes,53,opt,name=has_source,json=hasSource,proto3" json:"has_source"` - LimitClaimsPerChannel uint32 `protobuf:"varint,54,opt,name=limit_claims_per_channel,json=limitClaimsPerChannel,proto3" json:"limit_claims_per_channel"` + LimitClaimsPerChannel int32 `protobuf:"varint,54,opt,name=limit_claims_per_channel,json=limitClaimsPerChannel,proto3" json:"limit_claims_per_channel"` AnyLanguages []string `protobuf:"bytes,55,rep,name=any_languages,json=anyLanguages,proto3" json:"any_languages"` AllLanguages []string `protobuf:"bytes,56,rep,name=all_languages,json=allLanguages,proto3" json:"all_languages"` RemoveDuplicates bool `protobuf:"varint,57,opt,name=remove_duplicates,json=removeDuplicates,proto3" json:"remove_duplicates"` @@ -479,7 +479,7 @@ func (x *SearchRequest) GetText() string { return "" } -func (x *SearchRequest) GetLimit() uint32 { +func (x *SearchRequest) GetLimit() int32 { if x != nil { return x.Limit } @@ -808,7 +808,7 @@ func (x *SearchRequest) GetHasSource() *BoolValue { return nil } -func (x *SearchRequest) GetLimitClaimsPerChannel() uint32 { +func (x *SearchRequest) GetLimitClaimsPerChannel() int32 { if x != nil { return x.LimitClaimsPerChannel } @@ -876,7 +876,7 @@ var file_hub_proto_rawDesc = []byte{ 0x76, 0x65, 0x72, 0x74, 0x69, 0x62, 0x6c, 0x65, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x09, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x78, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x65, 0x78, 0x74, 0x12, 0x14, 0x0a, 0x05, - 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x6c, 0x69, 0x6d, + 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x5f, 0x62, 0x79, 0x18, 0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x42, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x6f, @@ -998,7 +998,7 @@ var file_hub_proto_rawDesc = []byte{ 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x6f, 0x6f, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x09, 0x68, 0x61, 0x73, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x37, 0x0a, 0x18, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x5f, 0x63, 0x6c, 0x61, 0x69, 0x6d, 0x73, 0x5f, 0x70, 0x65, 0x72, 0x5f, 0x63, 0x68, 0x61, - 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x36, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x15, 0x6c, 0x69, 0x6d, 0x69, + 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x36, 0x20, 0x01, 0x28, 0x05, 0x52, 0x15, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x43, 0x6c, 0x61, 0x69, 0x6d, 0x73, 0x50, 0x65, 0x72, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x23, 0x0a, 0x0d, 0x61, 0x6e, 0x79, 0x5f, 0x6c, 0x61, 0x6e, 0x67, 0x75, 0x61, 0x67, 0x65, 0x73, 0x18, 0x37, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x61, 0x6e, 0x79, 0x4c, 0x61, 0x6e, diff --git a/server/search.go b/server/search.go index ad7eafb..69e4620 100644 --- a/server/search.go +++ b/server/search.go @@ -18,6 +18,7 @@ import ( "github.com/olivere/elastic/v7" "golang.org/x/text/cases" "golang.org/x/text/language" + "google.golang.org/protobuf/encoding/protojson" "gopkg.in/karalabe/cookiejar.v1/collections/deque" ) @@ -179,43 +180,100 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, var pageSize = 10 var orderBy []orderField var searchIndices []string + var searchResult *elastic.SearchResult = nil client := s.EsClient searchIndices = make([]string, 0, 1) searchIndices = append(searchIndices, s.Args.EsIndex) - q := elastic.NewBoolQuery() + //Code for debugging locally + //indices, _ := client.IndexNames() + //for _, index := range indices { + // if index != "claims" { + // log.Println(index) + // searchIndices = append(searchIndices, index) + // } + //} - err := s.checkQuery(in) - if err != nil { - return nil, err - } - q = s.setupEsQuery(q, in, &pageSize, &from, &orderBy) - - fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title") - search := client.Search(). - Index(searchIndices...). - FetchSourceContext(fsc). - Query(q). // specify the query - From(0).Size(DefaultSearchSize) - - for _, x := range orderBy { - search = search.Sort(x.Field, x.IsAsc) + // If it's been more than RefreshDelta time since we last checked if the + // es index has been refreshed, we check (this is 2 seconds in prod, + // 0 seconds in debug / unit testing). If the index has been refreshed + // a different number of times since we last checked, we purge the cache + if time.Now().After(s.LastRefreshCheck.Add(s.RefreshDelta)) { + res, err := client.IndexStats(searchIndices[0]).Do(ctx) + if err != nil { + log.Printf("Error on ES index stats\n%v\n", err) + } + numRefreshes := res.Indices[searchIndices[0]].Primaries.Refresh.Total + if numRefreshes != s.NumESRefreshes { + _ = s.QueryCache.Purge() + s.NumESRefreshes = numRefreshes + } } - searchResult, err := search.Do(ctx) // execute - if err != nil && elastic.IsNotFound(err) { - log.Println("Index returned 404! Check writer. Index: ", searchIndices) - return &pb.Outputs{}, nil + var records []*record - } else if err != nil { - s.recordErrorAndReturn(err, "search_errors") - log.Println("Error executing query: ", err) - return nil, err + cacheKey := s.serializeSearchRequest(in) + + setPageVars(in, &pageSize, &from) + + /* + cache based on search request params + include from value and number of results. + When another search request comes in with same search params + and same or increased offset (which we currently don't even use?) + that will be a cache hit. + FIXME: For now the cache is turned off when in debugging mode + (for unit tests) because it breaks on some of them. + FIXME: Currently the cache just skips the initial search, + the mgets and post processing are still done. There's probably + a more efficient way to store the final result. + */ + + if val, err := s.QueryCache.Get(cacheKey); err != nil { + + q := elastic.NewBoolQuery() + + err := s.checkQuery(in) + if err != nil { + return nil, err + } + q = s.setupEsQuery(q, in, &orderBy) + + fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title") + search := client.Search(). + Index(searchIndices...). + FetchSourceContext(fsc). + Query(q). // specify the query + From(0).Size(DefaultSearchSize) + + for _, x := range orderBy { + search = search.Sort(x.Field, x.IsAsc) + } + + searchResult, err = search.Do(ctx) // execute + if err != nil && elastic.IsNotFound(err) { + log.Println("Index returned 404! Check writer. Index: ", searchIndices) + return &pb.Outputs{}, nil + + } else if err != nil { + s.recordErrorAndReturn(err, "search_errors") + log.Println("Error executing query: ", err) + return nil, err + } + + log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis) + + records = s.searchResultToRecords(searchResult) + err = s.QueryCache.Set(cacheKey, records) + if err != nil { + //FIXME: Should this be fatal? + log.Println("Error storing records in cache: ", err) + } + } else { + records = val.([]*record) } - log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis) - - txos, extraTxos, blocked := s.postProcessResults(ctx, client, searchResult, in, pageSize, from, searchIndices) + txos, extraTxos, blocked := s.postProcessResults(ctx, client, records, in, pageSize, from, searchIndices) t1 := time.Now() @@ -265,21 +323,9 @@ func (s *Server) cleanTags(tags []string) []string { return cleanedTags } -func (s *Server) postProcessResults( - ctx context.Context, - client *elastic.Client, - searchResult *elastic.SearchResult, - in *pb.SearchRequest, - pageSize int, - from int, - searchIndices []string) ([]*pb.Output, []*pb.Output, []*pb.Blocked) { - var txos []*pb.Output - var records []*record - var blockedRecords []*record - var blocked []*pb.Blocked - var blockedMap map[string]*pb.Blocked - - records = make([]*record, 0, searchResult.TotalHits()) +func (s *Server) searchResultToRecords( + searchResult *elastic.SearchResult) []*record { + records := make([]*record, 0, searchResult.TotalHits()) var r record for _, item := range searchResult.Each(reflect.TypeOf(r)) { @@ -288,6 +334,22 @@ func (s *Server) postProcessResults( } } + return records +} + +func (s *Server) postProcessResults( + ctx context.Context, + client *elastic.Client, + records []*record, + in *pb.SearchRequest, + pageSize int, + from int, + searchIndices []string) ([]*pb.Output, []*pb.Output, []*pb.Blocked) { + var txos []*pb.Output + var blockedRecords []*record + var blocked []*pb.Blocked + var blockedMap map[string]*pb.Blocked + //printJsonFullResults(searchResult) records, blockedRecords, blockedMap = removeBlocked(records) @@ -353,17 +415,26 @@ func (s *Server) checkQuery(in *pb.SearchRequest) error { for name, failed := range checks { if failed { time.Sleep(2) // throttle - return fmt.Errorf("%s cant have more than %d items", name, limit) + return fmt.Errorf("%s cant have more than %d items.", name, limit) } } return nil } +func setPageVars(in *pb.SearchRequest, pageSize *int, from *int) { + if in.Limit > 0 { + log.Printf("############ limit: %d\n", in.Limit) + *pageSize = int(in.Limit) + } + + if in.Offset > 0 { + *from = int(in.Offset) + } +} + func (s *Server) setupEsQuery( q *elastic.BoolQuery, in *pb.SearchRequest, - pageSize *int, - from *int, orderBy *[]orderField) *elastic.BoolQuery { claimTypes := map[string]int{ "stream": 1, @@ -418,14 +489,6 @@ func (s *Server) setupEsQuery( q = q.Must(elastic.NewTermQuery("is_controlling", in.IsControlling)) } - if in.Limit > 0 { - *pageSize = int(in.Limit) - } - - if in.Offset > 0 { - *from = int(in.Offset) - } - if len(in.ClaimName) > 0 { in.NormalizedName = util.NormalizeName(in.ClaimName) } @@ -617,7 +680,7 @@ func (s *Server) getUniqueChannels(records []*record, client *elastic.Client, ct return channelTxos, channels } -func (s * Server) getClaimsForReposts(ctx context.Context, client *elastic.Client, records []*record, searchIndices []string) ([]*pb.Output, []*record, map[string]*pb.Output) { +func (s *Server) getClaimsForReposts(ctx context.Context, client *elastic.Client, records []*record, searchIndices []string) ([]*pb.Output, []*record, map[string]*pb.Output) { var totalReposted = 0 var mget = client.Mget() //.StoredFields("_id") @@ -669,6 +732,29 @@ func (s * Server) getClaimsForReposts(ctx context.Context, client *elastic.Clien return claims, repostedRecords, respostedMap } +/* + Takes a search request and serializes into a string for use as a key into the + internal cache for the hub. +*/ +func (s *Server) serializeSearchRequest(request *pb.SearchRequest) string { + // Save the offest / limit and set to zero, cache hits should happen regardless + // and they're used in post processing + //offset, limit := request.Offset, request.Limit + //request.Offset = 0 + //request.Limit = 0 + + bytes, err := protojson.Marshal(request) + if err != nil { + return "" + } + str := string((*s.S256).Sum(bytes)) + // log.Println(str) + //request.Offset = offset + //request.Limit = limit + + return str +} + func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*record { finalHits := make([]*record, 0, len(searchHits)) var channelCounters map[string]int @@ -803,3 +889,4 @@ func removeBlocked(searchHits []*record) ([]*record, []*record, map[string]*pb.B return newHits, blockedHits, blockedChannels } + diff --git a/server/server.go b/server/server.go index dde6d4e..66a88b6 100644 --- a/server/server.go +++ b/server/server.go @@ -2,14 +2,16 @@ package server import ( "context" + "crypto/sha256" "fmt" + "hash" "log" + "net/http" "os" "regexp" - - "net/http" "time" + "github.com/ReneKroon/ttlcache/v2" "github.com/lbryio/hub/meta" pb "github.com/lbryio/hub/protobuf/go" "github.com/olivere/elastic/v7" @@ -18,12 +20,17 @@ import ( ) type Server struct { - GrpcServer *grpc.Server - Args *Args - MultiSpaceRe *regexp.Regexp - WeirdCharsRe *regexp.Regexp - EsClient *elastic.Client - Servers []*FederatedServer + GrpcServer *grpc.Server + Args *Args + MultiSpaceRe *regexp.Regexp + WeirdCharsRe *regexp.Regexp + EsClient *elastic.Client + Servers []*FederatedServer + QueryCache *ttlcache.Cache + S256 *hash.Hash + LastRefreshCheck time.Time + RefreshDelta time.Duration + NumESRefreshes int64 pb.UnimplementedHubServer } @@ -41,13 +48,15 @@ const ( type Args struct { // TODO Make command types an enum - CmdType int - Host string - Port string - EsHost string - EsPort string - EsIndex string - Debug bool + CmdType int + Host string + Port string + EsHost string + EsPort string + EsIndex string + Debug bool + RefreshDelta int + CacheTTL int } func getVersion() string { @@ -123,12 +132,29 @@ func MakeHubServer(args *Args) *Server { if err != nil { log.Fatal(err) } + + cache := ttlcache.NewCache() + err = cache.SetTTL(time.Duration(args.CacheTTL) * time.Minute) + if err != nil { + log.Fatal(err) + } + s256 := sha256.New() + var refreshDelta = time.Second * time.Duration(args.RefreshDelta) + if args.Debug { + refreshDelta = time.Second * 0 + } + s := &Server{ - GrpcServer: grpcServer, - Args: args, - MultiSpaceRe: multiSpaceRe, - WeirdCharsRe: weirdCharsRe, - EsClient: client, + GrpcServer: grpcServer, + Args: args, + MultiSpaceRe: multiSpaceRe, + WeirdCharsRe: weirdCharsRe, + EsClient: client, + QueryCache: cache, + S256: &s256, + LastRefreshCheck: time.Now(), + RefreshDelta: refreshDelta, + NumESRefreshes: 0, } return s