diff --git a/go.mod b/go.mod index 59d908d..c58e386 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.16 require ( github.com/ReneKroon/ttlcache/v2 v2.8.1 github.com/akamensky/argparse v1.2.2 - github.com/btcsuite/btcutil v1.0.2 // indirect + github.com/btcsuite/btcutil v1.0.2 github.com/lbryio/lbry.go/v2 v2.7.2-0.20210625145058-2b155597bf57 github.com/olivere/elastic/v7 v7.0.24 github.com/prometheus/client_golang v1.11.0 diff --git a/protobuf/definitions/hub.proto b/protobuf/definitions/hub.proto index 5dca601..13d2213 100644 --- a/protobuf/definitions/hub.proto +++ b/protobuf/definitions/hub.proto @@ -48,7 +48,7 @@ message SearchRequest { InvertibleField claim_id = 1; InvertibleField channel_id = 2; string text = 3; - uint32 limit = 4; + int32 limit = 4; repeated string order_by = 5; uint32 offset = 6; bool is_controlling = 7; @@ -95,7 +95,7 @@ message SearchRequest { repeated string not_tags = 51; bool has_channel_signature = 52; BoolValue has_source = 53; - uint32 limit_claims_per_channel = 54; + int32 limit_claims_per_channel = 54; repeated string any_languages = 55; repeated string all_languages = 56; bool remove_duplicates = 57; diff --git a/protobuf/go/hub.pb.go b/protobuf/go/hub.pb.go index b6ee72d..46c53fb 100644 --- a/protobuf/go/hub.pb.go +++ b/protobuf/go/hub.pb.go @@ -372,7 +372,7 @@ type SearchRequest struct { ClaimId *InvertibleField `protobuf:"bytes,1,opt,name=claim_id,json=claimId,proto3" json:"claim_id"` ChannelId *InvertibleField `protobuf:"bytes,2,opt,name=channel_id,json=channelId,proto3" json:"channel_id"` Text string `protobuf:"bytes,3,opt,name=text,proto3" json:"text"` - Limit uint32 `protobuf:"varint,4,opt,name=limit,proto3" json:"limit"` + Limit int32 `protobuf:"varint,4,opt,name=limit,proto3" json:"limit"` OrderBy []string `protobuf:"bytes,5,rep,name=order_by,json=orderBy,proto3" json:"order_by"` Offset uint32 `protobuf:"varint,6,opt,name=offset,proto3" json:"offset"` IsControlling bool `protobuf:"varint,7,opt,name=is_controlling,json=isControlling,proto3" json:"is_controlling"` @@ -419,7 +419,7 @@ type SearchRequest struct { NotTags []string `protobuf:"bytes,51,rep,name=not_tags,json=notTags,proto3" json:"not_tags"` HasChannelSignature bool `protobuf:"varint,52,opt,name=has_channel_signature,json=hasChannelSignature,proto3" json:"has_channel_signature"` HasSource *BoolValue `protobuf:"bytes,53,opt,name=has_source,json=hasSource,proto3" json:"has_source"` - LimitClaimsPerChannel uint32 `protobuf:"varint,54,opt,name=limit_claims_per_channel,json=limitClaimsPerChannel,proto3" json:"limit_claims_per_channel"` + LimitClaimsPerChannel int32 `protobuf:"varint,54,opt,name=limit_claims_per_channel,json=limitClaimsPerChannel,proto3" json:"limit_claims_per_channel"` AnyLanguages []string `protobuf:"bytes,55,rep,name=any_languages,json=anyLanguages,proto3" json:"any_languages"` AllLanguages []string `protobuf:"bytes,56,rep,name=all_languages,json=allLanguages,proto3" json:"all_languages"` RemoveDuplicates bool `protobuf:"varint,57,opt,name=remove_duplicates,json=removeDuplicates,proto3" json:"remove_duplicates"` @@ -479,7 +479,7 @@ func (x *SearchRequest) GetText() string { return "" } -func (x *SearchRequest) GetLimit() uint32 { +func (x *SearchRequest) GetLimit() int32 { if x != nil { return x.Limit } @@ -808,7 +808,7 @@ func (x *SearchRequest) GetHasSource() *BoolValue { return nil } -func (x *SearchRequest) GetLimitClaimsPerChannel() uint32 { +func (x *SearchRequest) GetLimitClaimsPerChannel() int32 { if x != nil { return x.LimitClaimsPerChannel } @@ -876,7 +876,7 @@ var file_hub_proto_rawDesc = []byte{ 0x76, 0x65, 0x72, 0x74, 0x69, 0x62, 0x6c, 0x65, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x09, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x78, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x65, 0x78, 0x74, 0x12, 0x14, 0x0a, 0x05, - 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x6c, 0x69, 0x6d, + 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x5f, 0x62, 0x79, 0x18, 0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x42, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x6f, @@ -998,7 +998,7 @@ var file_hub_proto_rawDesc = []byte{ 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x6f, 0x6f, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x09, 0x68, 0x61, 0x73, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x37, 0x0a, 0x18, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x5f, 0x63, 0x6c, 0x61, 0x69, 0x6d, 0x73, 0x5f, 0x70, 0x65, 0x72, 0x5f, 0x63, 0x68, 0x61, - 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x36, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x15, 0x6c, 0x69, 0x6d, 0x69, + 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x36, 0x20, 0x01, 0x28, 0x05, 0x52, 0x15, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x43, 0x6c, 0x61, 0x69, 0x6d, 0x73, 0x50, 0x65, 0x72, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x23, 0x0a, 0x0d, 0x61, 0x6e, 0x79, 0x5f, 0x6c, 0x61, 0x6e, 0x67, 0x75, 0x61, 0x67, 0x65, 0x73, 0x18, 0x37, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x61, 0x6e, 0x79, 0x4c, 0x61, 0x6e, diff --git a/server/search.go b/server/search.go index 274c762..442720b 100644 --- a/server/search.go +++ b/server/search.go @@ -187,6 +187,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, searchIndices = make([]string, 0, 1) //searchIndices = append(searchIndices, s.Args.EsIndex) + //Code for debugging locally indices, _ := client.IndexNames() for _, index := range indices { if index != "claims" { @@ -195,11 +196,26 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, } } - //cacheHit := false + // If it's been more than RefreshDelta time since we last checked if the + // es index has been refreshed, we check (this is 2 seconds in prod, + // 0 seconds in debug / unit testing). If the index has been refreshed + // a different number of times since we last checked, we purge the cache + if time.Now().After(s.LastRefreshCheck.Add(s.RefreshDelta)) { + // FIXME: Should this be on all indices + res, _ := client.IndexStats(searchIndices[0]).Do(ctx) + numRefreshes := res.Indices[searchIndices[0]].Primaries.Refresh.Total + if numRefreshes != s.NumESRefreshes { + _ = s.QueryCache.Purge() + s.NumESRefreshes = numRefreshes + } + } + var records []*record cacheKey := s.serializeSearchRequest(in) + setPageVars(in, &pageSize, &from) + /* cache based on search request params include from value and number of results. @@ -213,7 +229,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, a more efficient way to store the final result. */ - if val, err := s.QueryCache.Get(cacheKey); err != nil || s.Args.Debug { + if val, err := s.QueryCache.Get(cacheKey); err != nil { q := elastic.NewBoolQuery() @@ -221,7 +237,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, if err != nil { return nil, err } - q = s.setupEsQuery(q, in, &pageSize, &from, &orderBy) + q = s.setupEsQuery(q, in, &orderBy) fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title") search := client.Search(). @@ -399,17 +415,26 @@ func (s *Server) checkQuery(in *pb.SearchRequest) error { for name, failed := range checks { if failed { time.Sleep(2) // throttle - return fmt.Errorf("%s cant have more than %d items", name, limit) + return fmt.Errorf("%s cant have more than %d items.", name, limit) } } return nil } +func setPageVars(in *pb.SearchRequest, pageSize *int, from *int) { + if in.Limit > 0 { + log.Printf("############ limit: %d\n", in.Limit) + *pageSize = int(in.Limit) + } + + if in.Offset > 0 { + *from = int(in.Offset) + } +} + func (s *Server) setupEsQuery( q *elastic.BoolQuery, in *pb.SearchRequest, - pageSize *int, - from *int, orderBy *[]orderField) *elastic.BoolQuery { claimTypes := map[string]int{ "stream": 1, @@ -464,14 +489,6 @@ func (s *Server) setupEsQuery( q = q.Must(elastic.NewTermQuery("is_controlling", in.IsControlling)) } - if in.Limit > 0 { - *pageSize = int(in.Limit) - } - - if in.Offset > 0 { - *from = int(in.Offset) - } - if len(in.ClaimName) > 0 { in.NormalizedName = util.NormalizeName(in.ClaimName) } @@ -723,12 +740,21 @@ func (s *Server) getClaimsForReposts(ctx context.Context, client *elastic.Client internal cache for the hub. */ func (s *Server) serializeSearchRequest(request *pb.SearchRequest) string { + // Save the offest / limit and set to zero, cache hits should happen regardless + // and they're used in post processing + //offset, limit := request.Offset, request.Limit + //request.Offset = 0 + //request.Limit = 0 + bytes, err := protojson.Marshal(request) if err != nil { return "" } str := string((*s.S256).Sum(bytes)) - log.Println(str) + // log.Println(str) + //request.Offset = offset + //request.Limit = limit + return str } diff --git a/server/server.go b/server/server.go index d6e68f0..bb34112 100644 --- a/server/server.go +++ b/server/server.go @@ -20,14 +20,17 @@ import ( ) type Server struct { - GrpcServer *grpc.Server - Args *Args - MultiSpaceRe *regexp.Regexp - WeirdCharsRe *regexp.Regexp - EsClient *elastic.Client - Servers []*FederatedServer - QueryCache *ttlcache.Cache - S256 *hash.Hash + GrpcServer *grpc.Server + Args *Args + MultiSpaceRe *regexp.Regexp + WeirdCharsRe *regexp.Regexp + EsClient *elastic.Client + Servers []*FederatedServer + QueryCache *ttlcache.Cache + S256 *hash.Hash + LastRefreshCheck time.Time + RefreshDelta time.Duration + NumESRefreshes int64 pb.UnimplementedHubServer } @@ -134,14 +137,22 @@ func MakeHubServer(args *Args) *Server { log.Fatal(err) } s256 := sha256.New() + var refreshDelta = time.Second * 2 + if args.Debug { + refreshDelta = time.Second * 0 + } + s := &Server{ - GrpcServer: grpcServer, - Args: args, - MultiSpaceRe: multiSpaceRe, - WeirdCharsRe: weirdCharsRe, - EsClient: client, - QueryCache: cache, - S256: &s256, + GrpcServer: grpcServer, + Args: args, + MultiSpaceRe: multiSpaceRe, + WeirdCharsRe: weirdCharsRe, + EsClient: client, + QueryCache: cache, + S256: &s256, + LastRefreshCheck: time.Now(), + RefreshDelta: refreshDelta, + NumESRefreshes: 0, } return s