bug fixes and cache purging
This commit is contained in:
parent
36b4a3cdd9
commit
0d59480f3c
5 changed files with 76 additions and 39 deletions
2
go.mod
2
go.mod
|
@ -5,7 +5,7 @@ go 1.16
|
|||
require (
|
||||
github.com/ReneKroon/ttlcache/v2 v2.8.1
|
||||
github.com/akamensky/argparse v1.2.2
|
||||
github.com/btcsuite/btcutil v1.0.2 // indirect
|
||||
github.com/btcsuite/btcutil v1.0.2
|
||||
github.com/lbryio/lbry.go/v2 v2.7.2-0.20210625145058-2b155597bf57
|
||||
github.com/olivere/elastic/v7 v7.0.24
|
||||
github.com/prometheus/client_golang v1.11.0
|
||||
|
|
|
@ -48,7 +48,7 @@ message SearchRequest {
|
|||
InvertibleField claim_id = 1;
|
||||
InvertibleField channel_id = 2;
|
||||
string text = 3;
|
||||
uint32 limit = 4;
|
||||
int32 limit = 4;
|
||||
repeated string order_by = 5;
|
||||
uint32 offset = 6;
|
||||
bool is_controlling = 7;
|
||||
|
@ -95,7 +95,7 @@ message SearchRequest {
|
|||
repeated string not_tags = 51;
|
||||
bool has_channel_signature = 52;
|
||||
BoolValue has_source = 53;
|
||||
uint32 limit_claims_per_channel = 54;
|
||||
int32 limit_claims_per_channel = 54;
|
||||
repeated string any_languages = 55;
|
||||
repeated string all_languages = 56;
|
||||
bool remove_duplicates = 57;
|
||||
|
|
|
@ -372,7 +372,7 @@ type SearchRequest struct {
|
|||
ClaimId *InvertibleField `protobuf:"bytes,1,opt,name=claim_id,json=claimId,proto3" json:"claim_id"`
|
||||
ChannelId *InvertibleField `protobuf:"bytes,2,opt,name=channel_id,json=channelId,proto3" json:"channel_id"`
|
||||
Text string `protobuf:"bytes,3,opt,name=text,proto3" json:"text"`
|
||||
Limit uint32 `protobuf:"varint,4,opt,name=limit,proto3" json:"limit"`
|
||||
Limit int32 `protobuf:"varint,4,opt,name=limit,proto3" json:"limit"`
|
||||
OrderBy []string `protobuf:"bytes,5,rep,name=order_by,json=orderBy,proto3" json:"order_by"`
|
||||
Offset uint32 `protobuf:"varint,6,opt,name=offset,proto3" json:"offset"`
|
||||
IsControlling bool `protobuf:"varint,7,opt,name=is_controlling,json=isControlling,proto3" json:"is_controlling"`
|
||||
|
@ -419,7 +419,7 @@ type SearchRequest struct {
|
|||
NotTags []string `protobuf:"bytes,51,rep,name=not_tags,json=notTags,proto3" json:"not_tags"`
|
||||
HasChannelSignature bool `protobuf:"varint,52,opt,name=has_channel_signature,json=hasChannelSignature,proto3" json:"has_channel_signature"`
|
||||
HasSource *BoolValue `protobuf:"bytes,53,opt,name=has_source,json=hasSource,proto3" json:"has_source"`
|
||||
LimitClaimsPerChannel uint32 `protobuf:"varint,54,opt,name=limit_claims_per_channel,json=limitClaimsPerChannel,proto3" json:"limit_claims_per_channel"`
|
||||
LimitClaimsPerChannel int32 `protobuf:"varint,54,opt,name=limit_claims_per_channel,json=limitClaimsPerChannel,proto3" json:"limit_claims_per_channel"`
|
||||
AnyLanguages []string `protobuf:"bytes,55,rep,name=any_languages,json=anyLanguages,proto3" json:"any_languages"`
|
||||
AllLanguages []string `protobuf:"bytes,56,rep,name=all_languages,json=allLanguages,proto3" json:"all_languages"`
|
||||
RemoveDuplicates bool `protobuf:"varint,57,opt,name=remove_duplicates,json=removeDuplicates,proto3" json:"remove_duplicates"`
|
||||
|
@ -479,7 +479,7 @@ func (x *SearchRequest) GetText() string {
|
|||
return ""
|
||||
}
|
||||
|
||||
func (x *SearchRequest) GetLimit() uint32 {
|
||||
func (x *SearchRequest) GetLimit() int32 {
|
||||
if x != nil {
|
||||
return x.Limit
|
||||
}
|
||||
|
@ -808,7 +808,7 @@ func (x *SearchRequest) GetHasSource() *BoolValue {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (x *SearchRequest) GetLimitClaimsPerChannel() uint32 {
|
||||
func (x *SearchRequest) GetLimitClaimsPerChannel() int32 {
|
||||
if x != nil {
|
||||
return x.LimitClaimsPerChannel
|
||||
}
|
||||
|
@ -876,7 +876,7 @@ var file_hub_proto_rawDesc = []byte{
|
|||
0x76, 0x65, 0x72, 0x74, 0x69, 0x62, 0x6c, 0x65, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x09, 0x63,
|
||||
0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x78, 0x74,
|
||||
0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x65, 0x78, 0x74, 0x12, 0x14, 0x0a, 0x05,
|
||||
0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x6c, 0x69, 0x6d,
|
||||
0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6c, 0x69, 0x6d,
|
||||
0x69, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x5f, 0x62, 0x79, 0x18, 0x05,
|
||||
0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x42, 0x79, 0x12, 0x16, 0x0a,
|
||||
0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x6f,
|
||||
|
@ -998,7 +998,7 @@ var file_hub_proto_rawDesc = []byte{
|
|||
0x2e, 0x70, 0x62, 0x2e, 0x42, 0x6f, 0x6f, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x09, 0x68,
|
||||
0x61, 0x73, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x37, 0x0a, 0x18, 0x6c, 0x69, 0x6d, 0x69,
|
||||
0x74, 0x5f, 0x63, 0x6c, 0x61, 0x69, 0x6d, 0x73, 0x5f, 0x70, 0x65, 0x72, 0x5f, 0x63, 0x68, 0x61,
|
||||
0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x36, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x15, 0x6c, 0x69, 0x6d, 0x69,
|
||||
0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x36, 0x20, 0x01, 0x28, 0x05, 0x52, 0x15, 0x6c, 0x69, 0x6d, 0x69,
|
||||
0x74, 0x43, 0x6c, 0x61, 0x69, 0x6d, 0x73, 0x50, 0x65, 0x72, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65,
|
||||
0x6c, 0x12, 0x23, 0x0a, 0x0d, 0x61, 0x6e, 0x79, 0x5f, 0x6c, 0x61, 0x6e, 0x67, 0x75, 0x61, 0x67,
|
||||
0x65, 0x73, 0x18, 0x37, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x61, 0x6e, 0x79, 0x4c, 0x61, 0x6e,
|
||||
|
|
|
@ -187,6 +187,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
|
|||
searchIndices = make([]string, 0, 1)
|
||||
//searchIndices = append(searchIndices, s.Args.EsIndex)
|
||||
|
||||
//Code for debugging locally
|
||||
indices, _ := client.IndexNames()
|
||||
for _, index := range indices {
|
||||
if index != "claims" {
|
||||
|
@ -195,11 +196,26 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
|
|||
}
|
||||
}
|
||||
|
||||
//cacheHit := false
|
||||
// If it's been more than RefreshDelta time since we last checked if the
|
||||
// es index has been refreshed, we check (this is 2 seconds in prod,
|
||||
// 0 seconds in debug / unit testing). If the index has been refreshed
|
||||
// a different number of times since we last checked, we purge the cache
|
||||
if time.Now().After(s.LastRefreshCheck.Add(s.RefreshDelta)) {
|
||||
// FIXME: Should this be on all indices
|
||||
res, _ := client.IndexStats(searchIndices[0]).Do(ctx)
|
||||
numRefreshes := res.Indices[searchIndices[0]].Primaries.Refresh.Total
|
||||
if numRefreshes != s.NumESRefreshes {
|
||||
_ = s.QueryCache.Purge()
|
||||
s.NumESRefreshes = numRefreshes
|
||||
}
|
||||
}
|
||||
|
||||
var records []*record
|
||||
|
||||
cacheKey := s.serializeSearchRequest(in)
|
||||
|
||||
setPageVars(in, &pageSize, &from)
|
||||
|
||||
/*
|
||||
cache based on search request params
|
||||
include from value and number of results.
|
||||
|
@ -213,7 +229,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
|
|||
a more efficient way to store the final result.
|
||||
*/
|
||||
|
||||
if val, err := s.QueryCache.Get(cacheKey); err != nil || s.Args.Debug {
|
||||
if val, err := s.QueryCache.Get(cacheKey); err != nil {
|
||||
|
||||
q := elastic.NewBoolQuery()
|
||||
|
||||
|
@ -221,7 +237,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
q = s.setupEsQuery(q, in, &pageSize, &from, &orderBy)
|
||||
q = s.setupEsQuery(q, in, &orderBy)
|
||||
|
||||
fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title")
|
||||
search := client.Search().
|
||||
|
@ -399,17 +415,26 @@ func (s *Server) checkQuery(in *pb.SearchRequest) error {
|
|||
for name, failed := range checks {
|
||||
if failed {
|
||||
time.Sleep(2) // throttle
|
||||
return fmt.Errorf("%s cant have more than %d items", name, limit)
|
||||
return fmt.Errorf("%s cant have more than %d items.", name, limit)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func setPageVars(in *pb.SearchRequest, pageSize *int, from *int) {
|
||||
if in.Limit > 0 {
|
||||
log.Printf("############ limit: %d\n", in.Limit)
|
||||
*pageSize = int(in.Limit)
|
||||
}
|
||||
|
||||
if in.Offset > 0 {
|
||||
*from = int(in.Offset)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) setupEsQuery(
|
||||
q *elastic.BoolQuery,
|
||||
in *pb.SearchRequest,
|
||||
pageSize *int,
|
||||
from *int,
|
||||
orderBy *[]orderField) *elastic.BoolQuery {
|
||||
claimTypes := map[string]int{
|
||||
"stream": 1,
|
||||
|
@ -464,14 +489,6 @@ func (s *Server) setupEsQuery(
|
|||
q = q.Must(elastic.NewTermQuery("is_controlling", in.IsControlling))
|
||||
}
|
||||
|
||||
if in.Limit > 0 {
|
||||
*pageSize = int(in.Limit)
|
||||
}
|
||||
|
||||
if in.Offset > 0 {
|
||||
*from = int(in.Offset)
|
||||
}
|
||||
|
||||
if len(in.ClaimName) > 0 {
|
||||
in.NormalizedName = util.NormalizeName(in.ClaimName)
|
||||
}
|
||||
|
@ -723,12 +740,21 @@ func (s *Server) getClaimsForReposts(ctx context.Context, client *elastic.Client
|
|||
internal cache for the hub.
|
||||
*/
|
||||
func (s *Server) serializeSearchRequest(request *pb.SearchRequest) string {
|
||||
// Save the offest / limit and set to zero, cache hits should happen regardless
|
||||
// and they're used in post processing
|
||||
//offset, limit := request.Offset, request.Limit
|
||||
//request.Offset = 0
|
||||
//request.Limit = 0
|
||||
|
||||
bytes, err := protojson.Marshal(request)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
str := string((*s.S256).Sum(bytes))
|
||||
log.Println(str)
|
||||
// log.Println(str)
|
||||
//request.Offset = offset
|
||||
//request.Limit = limit
|
||||
|
||||
return str
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,9 @@ type Server struct {
|
|||
Servers []*FederatedServer
|
||||
QueryCache *ttlcache.Cache
|
||||
S256 *hash.Hash
|
||||
LastRefreshCheck time.Time
|
||||
RefreshDelta time.Duration
|
||||
NumESRefreshes int64
|
||||
pb.UnimplementedHubServer
|
||||
}
|
||||
|
||||
|
@ -134,6 +137,11 @@ func MakeHubServer(args *Args) *Server {
|
|||
log.Fatal(err)
|
||||
}
|
||||
s256 := sha256.New()
|
||||
var refreshDelta = time.Second * 2
|
||||
if args.Debug {
|
||||
refreshDelta = time.Second * 0
|
||||
}
|
||||
|
||||
s := &Server{
|
||||
GrpcServer: grpcServer,
|
||||
Args: args,
|
||||
|
@ -142,6 +150,9 @@ func MakeHubServer(args *Args) *Server {
|
|||
EsClient: client,
|
||||
QueryCache: cache,
|
||||
S256: &s256,
|
||||
LastRefreshCheck: time.Now(),
|
||||
RefreshDelta: refreshDelta,
|
||||
NumESRefreshes: 0,
|
||||
}
|
||||
|
||||
return s
|
||||
|
|
Loading…
Reference in a new issue