Query caching #15
1 changed files with 58 additions and 35 deletions
|
@ -179,43 +179,61 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
|
||||||
var pageSize = 10
|
var pageSize = 10
|
||||||
No, on seconds though, I don't think so. We accept a single ES index as an argument, and in all our use cases I believe we only ever have to deal with one. No, on seconds though, I don't think so. We accept a single ES index as an argument, and in all our use cases I believe we only ever have to deal with one.
Done. Done.
|
|||||||
var orderBy []orderField
|
var orderBy []orderField
|
||||||
var searchIndices []string
|
var searchIndices []string
|
||||||
|
var searchResult *elastic.SearchResult = nil
|
||||||
client := s.EsClient
|
client := s.EsClient
|
||||||
searchIndices = make([]string, 0, 1)
|
searchIndices = make([]string, 0, 1)
|
||||||
searchIndices = append(searchIndices, s.Args.EsIndex)
|
searchIndices = append(searchIndices, s.Args.EsIndex)
|
||||||
|
|
||||||
q := elastic.NewBoolQuery()
|
cacheHit := false
|
||||||
|
var cachedRecords []*record
|
||||||
|
/*
|
||||||
|
TODO: cache based on search request params
|
||||||
|
include from value and number of results.
|
||||||
|
When another search request comes in with same search params
|
||||||
|
and same or increased offset (which we currently don't even use?)
|
||||||
|
that will be a cache hit.
|
||||||
|
*/
|
||||||
|
|
||||||
err := s.checkQuery(in)
|
if !cacheHit {
|
||||||
if err != nil {
|
q := elastic.NewBoolQuery()
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
q = s.setupEsQuery(q, in, &pageSize, &from, &orderBy)
|
|
||||||
|
|
||||||
fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title")
|
err := s.checkQuery(in)
|
||||||
search := client.Search().
|
if err != nil {
|
||||||
Index(searchIndices...).
|
return nil, err
|
||||||
FetchSourceContext(fsc).
|
}
|
||||||
Query(q). // specify the query
|
q = s.setupEsQuery(q, in, &pageSize, &from, &orderBy)
|
||||||
From(0).Size(DefaultSearchSize)
|
|
||||||
|
|
||||||
for _, x := range orderBy {
|
fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title")
|
||||||
search = search.Sort(x.Field, x.IsAsc)
|
search := client.Search().
|
||||||
|
Index(searchIndices...).
|
||||||
|
FetchSourceContext(fsc).
|
||||||
|
Query(q). // specify the query
|
||||||
|
From(0).Size(DefaultSearchSize)
|
||||||
ES data should only change if a new block comes in. Maybe in the future we can use that info to purge the cache so we avoid checking the index every 2 seconds ES data should only change if a new block comes in. Maybe in the future we can use that info to purge the cache so we avoid checking the index every 2 seconds
This check shouldn't cost much, it's essentially just pinging the ES server, but we could definitely set the refresh delta higher to like 5 or maybe 10 seconds. @shyba we were talking about this over the weekend, any thoughts? This check shouldn't cost much, it's essentially just pinging the ES server, but we could definitely set the refresh delta higher to like 5 or maybe 10 seconds. @shyba we were talking about this over the weekend, any thoughts?
this is fine. we can always bump it up later if its an issue. no need to worry about i tmuch now this is fine. we can always bump it up later if its an issue. no need to worry about i tmuch now
Cool, I parameterized it at least along with the cache ttl. Cool, I parameterized it at least along with the cache ttl.
|
|||||||
|
|
||||||
|
for _, x := range orderBy {
|
||||||
|
search = search.Sort(x.Field, x.IsAsc)
|
||||||
|
}
|
||||||
|
|
||||||
|
searchResult, err = search.Do(ctx) // execute
|
||||||
|
if err != nil && elastic.IsNotFound(err) {
|
||||||
|
log.Println("Index returned 404! Check writer. Index: ", searchIndices)
|
||||||
|
return &pb.Outputs{}, nil
|
||||||
|
|
||||||
|
} else if err != nil {
|
||||||
|
s.recordErrorAndReturn(err, "search_errors")
|
||||||
|
log.Println("Error executing query: ", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis)
|
||||||
|
|
||||||
|
cachedRecords = make([]*record, 0, 0)
|
||||||
|
} else {
|
||||||
|
//TODO fill cached records here
|
||||||
|
cachedRecords = make([]*record, 0, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
searchResult, err := search.Do(ctx) // execute
|
txos, extraTxos, blocked := s.postProcessResults(ctx, client, searchResult, in, pageSize, from, searchIndices, cachedRecords)
|
||||||
if err != nil && elastic.IsNotFound(err) {
|
|
||||||
log.Println("Index returned 404! Check writer. Index: ", searchIndices)
|
|
||||||
return &pb.Outputs{}, nil
|
|
||||||
|
|
||||||
} else if err != nil {
|
|
||||||
s.recordErrorAndReturn(err, "search_errors")
|
|
||||||
log.Println("Error executing query: ", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis)
|
|
||||||
|
|
||||||
txos, extraTxos, blocked := s.postProcessResults(ctx, client, searchResult, in, pageSize, from, searchIndices)
|
|
||||||
|
|
||||||
t1 := time.Now()
|
t1 := time.Now()
|
||||||
|
|
||||||
|
@ -272,20 +290,25 @@ func (s *Server) postProcessResults(
|
||||||
in *pb.SearchRequest,
|
in *pb.SearchRequest,
|
||||||
pageSize int,
|
pageSize int,
|
||||||
from int,
|
from int,
|
||||||
searchIndices []string) ([]*pb.Output, []*pb.Output, []*pb.Blocked) {
|
searchIndices []string,
|
||||||
|
cachedRecords []*record) ([]*pb.Output, []*pb.Output, []*pb.Blocked) {
|
||||||
var txos []*pb.Output
|
var txos []*pb.Output
|
||||||
var records []*record
|
var records []*record
|
||||||
var blockedRecords []*record
|
var blockedRecords []*record
|
||||||
var blocked []*pb.Blocked
|
var blocked []*pb.Blocked
|
||||||
var blockedMap map[string]*pb.Blocked
|
var blockedMap map[string]*pb.Blocked
|
||||||
|
|
||||||
records = make([]*record, 0, searchResult.TotalHits())
|
if len(cachedRecords) < 0 {
|
||||||
|
records = make([]*record, 0, searchResult.TotalHits())
|
||||||
|
|
||||||
var r record
|
var r record
|
||||||
for _, item := range searchResult.Each(reflect.TypeOf(r)) {
|
for _, item := range searchResult.Each(reflect.TypeOf(r)) {
|
||||||
if t, ok := item.(record); ok {
|
if t, ok := item.(record); ok {
|
||||||
records = append(records, &t)
|
records = append(records, &t)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
records = cachedRecords
|
||||||
}
|
}
|
||||||
|
|
||||||
//printJsonFullResults(searchResult)
|
//printJsonFullResults(searchResult)
|
||||||
|
@ -617,7 +640,7 @@ func (s *Server) getUniqueChannels(records []*record, client *elastic.Client, ct
|
||||||
return channelTxos, channels
|
return channelTxos, channels
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s * Server) getClaimsForReposts(ctx context.Context, client *elastic.Client, records []*record, searchIndices []string) ([]*pb.Output, []*record, map[string]*pb.Output) {
|
func (s *Server) getClaimsForReposts(ctx context.Context, client *elastic.Client, records []*record, searchIndices []string) ([]*pb.Output, []*record, map[string]*pb.Output) {
|
||||||
|
|
||||||
var totalReposted = 0
|
var totalReposted = 0
|
||||||
var mget = client.Mget() //.StoredFields("_id")
|
var mget = client.Mget() //.StoredFields("_id")
|
||||||
|
|
Loading…
Reference in a new issue
does this need fixing?
i generally don't like swallowing errors. at the very least you should log it