Query caching #15

Merged
jeffreypicard merged 6 commits from query-caching into master 2021-10-05 00:05:44 +02:00
8 changed files with 227 additions and 95 deletions

2
dev.sh
View file

@ -3,4 +3,4 @@
hash reflex 2>/dev/null || go get github.com/cespare/reflex hash reflex 2>/dev/null || go get github.com/cespare/reflex
hash reflex 2>/dev/null || { echo >&2 'Make sure '"$(go env GOPATH)"'/bin is in your $PATH'; exit 1; } hash reflex 2>/dev/null || { echo >&2 'Make sure '"$(go env GOPATH)"'/bin is in your $PATH'; exit 1; }
reflex --decoration=none --start-service=true -- sh -c "go run . serve --dev" reflex --decoration=none --start-service=true -- sh -c "go run . serve --debug"

1
go.mod
View file

@ -3,6 +3,7 @@ module github.com/lbryio/hub
go 1.16 go 1.16
require ( require (
github.com/ReneKroon/ttlcache/v2 v2.8.1
github.com/akamensky/argparse v1.2.2 github.com/akamensky/argparse v1.2.2
github.com/btcsuite/btcutil v1.0.2 github.com/btcsuite/btcutil v1.0.2
github.com/lbryio/lbry.go/v2 v2.7.2-0.20210625145058-2b155597bf57 github.com/lbryio/lbry.go/v2 v2.7.2-0.20210625145058-2b155597bf57

12
go.sum
View file

@ -1,6 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/ReneKroon/ttlcache/v2 v2.8.1 h1:0Exdyt5+vEsdRoFO1T7qDIYM3gq/ETbeYV+vjgcPxZk=
github.com/ReneKroon/ttlcache/v2 v2.8.1/go.mod h1:mBxvsNY+BT8qLLd6CuAJubbKo6r0jh3nb5et22bbfGY=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/akamensky/argparse v1.2.2 h1:P17T0ZjlUNJuWTPPJ2A5dM1wxarHgHqfYH+AZTo2xQA= github.com/akamensky/argparse v1.2.2 h1:P17T0ZjlUNJuWTPPJ2A5dM1wxarHgHqfYH+AZTo2xQA=
github.com/akamensky/argparse v1.2.2/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA= github.com/akamensky/argparse v1.2.2/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA=
@ -196,6 +198,8 @@ github.com/ybbus/jsonrpc v0.0.0-20180411222309-2a548b7d822d/go.mod h1:XJrh1eMSzd
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@ -207,6 +211,8 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 h1:2M3HP5CCK1Si9FQhwnzYhXdG6DXeebvUHFpre8QvbyI=
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
@ -236,6 +242,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@ -271,8 +279,11 @@ golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20210112230658-8b4aab62c064/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@ -310,6 +321,7 @@ google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/l
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/ini.v1 v1.48.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.48.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=

30
main.go
View file

@ -18,11 +18,13 @@ import (
) )
const ( const (
defaultHost = "0.0.0.0" defaultHost = "0.0.0.0"
defaultPort = "50051" defaultPort = "50051"
defaultEsHost = "http://localhost" defaultEsHost = "http://localhost"
defaultEsIndex = "claims" defaultEsIndex = "claims"
defaultEsPort = "9200" defaultEsPort = "9200"
defaultRefreshDelta = 5
defaultCacheTTL = 5
) )
func GetEnvironment(data []string, getkeyval func(item string) (key, val string)) map[string]string { func GetEnvironment(data []string, getkeyval func(item string) (key, val string)) map[string]string {
@ -71,6 +73,8 @@ func parseArgs(searchRequest *pb.SearchRequest) *server.Args {
esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: defaultEsHost}) esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: defaultEsHost})
esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: defaultEsPort}) esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: defaultEsPort})
esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: defaultEsIndex}) esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: defaultEsIndex})
refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: defaultRefreshDelta})
cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: defaultCacheTTL})
text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"}) text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"})
name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"}) name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"})
@ -89,13 +93,15 @@ func parseArgs(searchRequest *pb.SearchRequest) *server.Args {
} }
args := &server.Args{ args := &server.Args{
CmdType: server.SearchCmd, CmdType: server.SearchCmd,
Host: *host, Host: *host,
Port: ":" + *port, Port: ":" + *port,
EsHost: *esHost, EsHost: *esHost,
EsPort: *esPort, EsPort: *esPort,
EsIndex: *esIndex, EsIndex: *esIndex,
Debug: *debug, Debug: *debug,
RefreshDelta: *refreshDelta,
CacheTTL: *cacheTTL,
} }
if esHost, ok := environment["ELASTIC_HOST"]; ok { if esHost, ok := environment["ELASTIC_HOST"]; ok {

View file

@ -48,7 +48,7 @@ message SearchRequest {
InvertibleField claim_id = 1; InvertibleField claim_id = 1;
InvertibleField channel_id = 2; InvertibleField channel_id = 2;
string text = 3; string text = 3;
uint32 limit = 4; int32 limit = 4;
repeated string order_by = 5; repeated string order_by = 5;
uint32 offset = 6; uint32 offset = 6;
bool is_controlling = 7; bool is_controlling = 7;
@ -95,7 +95,7 @@ message SearchRequest {
repeated string not_tags = 51; repeated string not_tags = 51;
bool has_channel_signature = 52; bool has_channel_signature = 52;
BoolValue has_source = 53; BoolValue has_source = 53;
uint32 limit_claims_per_channel = 54; int32 limit_claims_per_channel = 54;
repeated string any_languages = 55; repeated string any_languages = 55;
repeated string all_languages = 56; repeated string all_languages = 56;
bool remove_duplicates = 57; bool remove_duplicates = 57;

View file

@ -372,7 +372,7 @@ type SearchRequest struct {
ClaimId *InvertibleField `protobuf:"bytes,1,opt,name=claim_id,json=claimId,proto3" json:"claim_id"` ClaimId *InvertibleField `protobuf:"bytes,1,opt,name=claim_id,json=claimId,proto3" json:"claim_id"`
ChannelId *InvertibleField `protobuf:"bytes,2,opt,name=channel_id,json=channelId,proto3" json:"channel_id"` ChannelId *InvertibleField `protobuf:"bytes,2,opt,name=channel_id,json=channelId,proto3" json:"channel_id"`
Text string `protobuf:"bytes,3,opt,name=text,proto3" json:"text"` Text string `protobuf:"bytes,3,opt,name=text,proto3" json:"text"`
Limit uint32 `protobuf:"varint,4,opt,name=limit,proto3" json:"limit"` Limit int32 `protobuf:"varint,4,opt,name=limit,proto3" json:"limit"`
OrderBy []string `protobuf:"bytes,5,rep,name=order_by,json=orderBy,proto3" json:"order_by"` OrderBy []string `protobuf:"bytes,5,rep,name=order_by,json=orderBy,proto3" json:"order_by"`
Offset uint32 `protobuf:"varint,6,opt,name=offset,proto3" json:"offset"` Offset uint32 `protobuf:"varint,6,opt,name=offset,proto3" json:"offset"`
IsControlling bool `protobuf:"varint,7,opt,name=is_controlling,json=isControlling,proto3" json:"is_controlling"` IsControlling bool `protobuf:"varint,7,opt,name=is_controlling,json=isControlling,proto3" json:"is_controlling"`
@ -419,7 +419,7 @@ type SearchRequest struct {
NotTags []string `protobuf:"bytes,51,rep,name=not_tags,json=notTags,proto3" json:"not_tags"` NotTags []string `protobuf:"bytes,51,rep,name=not_tags,json=notTags,proto3" json:"not_tags"`
HasChannelSignature bool `protobuf:"varint,52,opt,name=has_channel_signature,json=hasChannelSignature,proto3" json:"has_channel_signature"` HasChannelSignature bool `protobuf:"varint,52,opt,name=has_channel_signature,json=hasChannelSignature,proto3" json:"has_channel_signature"`
HasSource *BoolValue `protobuf:"bytes,53,opt,name=has_source,json=hasSource,proto3" json:"has_source"` HasSource *BoolValue `protobuf:"bytes,53,opt,name=has_source,json=hasSource,proto3" json:"has_source"`
LimitClaimsPerChannel uint32 `protobuf:"varint,54,opt,name=limit_claims_per_channel,json=limitClaimsPerChannel,proto3" json:"limit_claims_per_channel"` LimitClaimsPerChannel int32 `protobuf:"varint,54,opt,name=limit_claims_per_channel,json=limitClaimsPerChannel,proto3" json:"limit_claims_per_channel"`
AnyLanguages []string `protobuf:"bytes,55,rep,name=any_languages,json=anyLanguages,proto3" json:"any_languages"` AnyLanguages []string `protobuf:"bytes,55,rep,name=any_languages,json=anyLanguages,proto3" json:"any_languages"`
AllLanguages []string `protobuf:"bytes,56,rep,name=all_languages,json=allLanguages,proto3" json:"all_languages"` AllLanguages []string `protobuf:"bytes,56,rep,name=all_languages,json=allLanguages,proto3" json:"all_languages"`
RemoveDuplicates bool `protobuf:"varint,57,opt,name=remove_duplicates,json=removeDuplicates,proto3" json:"remove_duplicates"` RemoveDuplicates bool `protobuf:"varint,57,opt,name=remove_duplicates,json=removeDuplicates,proto3" json:"remove_duplicates"`
@ -479,7 +479,7 @@ func (x *SearchRequest) GetText() string {
return "" return ""
} }
func (x *SearchRequest) GetLimit() uint32 { func (x *SearchRequest) GetLimit() int32 {
if x != nil { if x != nil {
return x.Limit return x.Limit
} }
@ -808,7 +808,7 @@ func (x *SearchRequest) GetHasSource() *BoolValue {
return nil return nil
} }
func (x *SearchRequest) GetLimitClaimsPerChannel() uint32 { func (x *SearchRequest) GetLimitClaimsPerChannel() int32 {
if x != nil { if x != nil {
return x.LimitClaimsPerChannel return x.LimitClaimsPerChannel
} }
@ -876,7 +876,7 @@ var file_hub_proto_rawDesc = []byte{
0x76, 0x65, 0x72, 0x74, 0x69, 0x62, 0x6c, 0x65, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x09, 0x63, 0x76, 0x65, 0x72, 0x74, 0x69, 0x62, 0x6c, 0x65, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x09, 0x63,
0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x78, 0x74, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x78, 0x74,
0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x65, 0x78, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x65, 0x78, 0x74, 0x12, 0x14, 0x0a, 0x05,
0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6c, 0x69, 0x6d,
0x69, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x5f, 0x62, 0x79, 0x18, 0x05, 0x69, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x5f, 0x62, 0x79, 0x18, 0x05,
0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x42, 0x79, 0x12, 0x16, 0x0a, 0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x42, 0x79, 0x12, 0x16, 0x0a,
0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x6f, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x6f,
@ -998,7 +998,7 @@ var file_hub_proto_rawDesc = []byte{
0x2e, 0x70, 0x62, 0x2e, 0x42, 0x6f, 0x6f, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x09, 0x68, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x6f, 0x6f, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x09, 0x68,
0x61, 0x73, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x37, 0x0a, 0x18, 0x6c, 0x69, 0x6d, 0x69, 0x61, 0x73, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x37, 0x0a, 0x18, 0x6c, 0x69, 0x6d, 0x69,
0x74, 0x5f, 0x63, 0x6c, 0x61, 0x69, 0x6d, 0x73, 0x5f, 0x70, 0x65, 0x72, 0x5f, 0x63, 0x68, 0x61, 0x74, 0x5f, 0x63, 0x6c, 0x61, 0x69, 0x6d, 0x73, 0x5f, 0x70, 0x65, 0x72, 0x5f, 0x63, 0x68, 0x61,
0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x36, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x15, 0x6c, 0x69, 0x6d, 0x69, 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x36, 0x20, 0x01, 0x28, 0x05, 0x52, 0x15, 0x6c, 0x69, 0x6d, 0x69,
0x74, 0x43, 0x6c, 0x61, 0x69, 0x6d, 0x73, 0x50, 0x65, 0x72, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x74, 0x43, 0x6c, 0x61, 0x69, 0x6d, 0x73, 0x50, 0x65, 0x72, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65,
0x6c, 0x12, 0x23, 0x0a, 0x0d, 0x61, 0x6e, 0x79, 0x5f, 0x6c, 0x61, 0x6e, 0x67, 0x75, 0x61, 0x67, 0x6c, 0x12, 0x23, 0x0a, 0x0d, 0x61, 0x6e, 0x79, 0x5f, 0x6c, 0x61, 0x6e, 0x67, 0x75, 0x61, 0x67,
0x65, 0x73, 0x18, 0x37, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x61, 0x6e, 0x79, 0x4c, 0x61, 0x6e, 0x65, 0x73, 0x18, 0x37, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x61, 0x6e, 0x79, 0x4c, 0x61, 0x6e,

View file

@ -18,6 +18,7 @@ import (
"github.com/olivere/elastic/v7" "github.com/olivere/elastic/v7"
"golang.org/x/text/cases" "golang.org/x/text/cases"
"golang.org/x/text/language" "golang.org/x/text/language"
"google.golang.org/protobuf/encoding/protojson"
"gopkg.in/karalabe/cookiejar.v1/collections/deque" "gopkg.in/karalabe/cookiejar.v1/collections/deque"
) )
@ -179,43 +180,100 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
var pageSize = 10 var pageSize = 10
var orderBy []orderField var orderBy []orderField
var searchIndices []string var searchIndices []string
var searchResult *elastic.SearchResult = nil
client := s.EsClient client := s.EsClient
searchIndices = make([]string, 0, 1) searchIndices = make([]string, 0, 1)
searchIndices = append(searchIndices, s.Args.EsIndex) searchIndices = append(searchIndices, s.Args.EsIndex)
q := elastic.NewBoolQuery() //Code for debugging locally
//indices, _ := client.IndexNames()
//for _, index := range indices {
// if index != "claims" {
// log.Println(index)
// searchIndices = append(searchIndices, index)
// }
//}
err := s.checkQuery(in) // If it's been more than RefreshDelta time since we last checked if the
if err != nil { // es index has been refreshed, we check (this is 2 seconds in prod,
return nil, err // 0 seconds in debug / unit testing). If the index has been refreshed
} // a different number of times since we last checked, we purge the cache
q = s.setupEsQuery(q, in, &pageSize, &from, &orderBy) if time.Now().After(s.LastRefreshCheck.Add(s.RefreshDelta)) {
res, err := client.IndexStats(searchIndices[0]).Do(ctx)
fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title") if err != nil {
search := client.Search(). log.Printf("Error on ES index stats\n%v\n", err)
Index(searchIndices...). }
FetchSourceContext(fsc). numRefreshes := res.Indices[searchIndices[0]].Primaries.Refresh.Total
Query(q). // specify the query if numRefreshes != s.NumESRefreshes {
From(0).Size(DefaultSearchSize) _ = s.QueryCache.Purge()
s.NumESRefreshes = numRefreshes
for _, x := range orderBy { }
search = search.Sort(x.Field, x.IsAsc)
} }
lyoshenka commented 2021-10-04 20:57:26 +02:00 (Migrated from github.com)
Review

ES data should only change if a new block comes in. Maybe in the future we can use that info to purge the cache so we avoid checking the index every 2 seconds

ES data should only change if a new block comes in. Maybe in the future we can use that info to purge the cache so we avoid checking the index every 2 seconds
jeffreypicard commented 2021-10-04 21:06:06 +02:00 (Migrated from github.com)
Review

This check shouldn't cost much, it's essentially just pinging the ES server, but we could definitely set the refresh delta higher to like 5 or maybe 10 seconds. @shyba we were talking about this over the weekend, any thoughts?

This check shouldn't cost much, it's essentially just pinging the ES server, but we could definitely set the refresh delta higher to like 5 or maybe 10 seconds. @shyba we were talking about this over the weekend, any thoughts?
lyoshenka commented 2021-10-04 21:13:45 +02:00 (Migrated from github.com)
Review

this is fine. we can always bump it up later if its an issue. no need to worry about i tmuch now

this is fine. we can always bump it up later if its an issue. no need to worry about i tmuch now
jeffreypicard commented 2021-10-05 00:03:25 +02:00 (Migrated from github.com)
Review

Cool, I parameterized it at least along with the cache ttl.

Cool, I parameterized it at least along with the cache ttl.
searchResult, err := search.Do(ctx) // execute var records []*record
if err != nil && elastic.IsNotFound(err) {
log.Println("Index returned 404! Check writer. Index: ", searchIndices)
return &pb.Outputs{}, nil
} else if err != nil { cacheKey := s.serializeSearchRequest(in)
s.recordErrorAndReturn(err, "search_errors")
log.Println("Error executing query: ", err) setPageVars(in, &pageSize, &from)
return nil, err
/*
cache based on search request params
include from value and number of results.
When another search request comes in with same search params
and same or increased offset (which we currently don't even use?)
that will be a cache hit.
FIXME: For now the cache is turned off when in debugging mode
(for unit tests) because it breaks on some of them.
FIXME: Currently the cache just skips the initial search,
the mgets and post processing are still done. There's probably
a more efficient way to store the final result.
*/
if val, err := s.QueryCache.Get(cacheKey); err != nil {
q := elastic.NewBoolQuery()
err := s.checkQuery(in)
if err != nil {
return nil, err
}
q = s.setupEsQuery(q, in, &orderBy)
fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title")
search := client.Search().
Index(searchIndices...).
FetchSourceContext(fsc).
Query(q). // specify the query
From(0).Size(DefaultSearchSize)
for _, x := range orderBy {
search = search.Sort(x.Field, x.IsAsc)
}
searchResult, err = search.Do(ctx) // execute
if err != nil && elastic.IsNotFound(err) {
log.Println("Index returned 404! Check writer. Index: ", searchIndices)
return &pb.Outputs{}, nil
} else if err != nil {
s.recordErrorAndReturn(err, "search_errors")
log.Println("Error executing query: ", err)
return nil, err
}
log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis)
records = s.searchResultToRecords(searchResult)
err = s.QueryCache.Set(cacheKey, records)
if err != nil {
//FIXME: Should this be fatal?
log.Println("Error storing records in cache: ", err)
}
} else {
records = val.([]*record)
} }
log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis) txos, extraTxos, blocked := s.postProcessResults(ctx, client, records, in, pageSize, from, searchIndices)
txos, extraTxos, blocked := s.postProcessResults(ctx, client, searchResult, in, pageSize, from, searchIndices)
t1 := time.Now() t1 := time.Now()
@ -265,21 +323,9 @@ func (s *Server) cleanTags(tags []string) []string {
return cleanedTags return cleanedTags
} }
func (s *Server) postProcessResults( func (s *Server) searchResultToRecords(
ctx context.Context, searchResult *elastic.SearchResult) []*record {
client *elastic.Client, records := make([]*record, 0, searchResult.TotalHits())
searchResult *elastic.SearchResult,
in *pb.SearchRequest,
pageSize int,
from int,
searchIndices []string) ([]*pb.Output, []*pb.Output, []*pb.Blocked) {
var txos []*pb.Output
var records []*record
var blockedRecords []*record
var blocked []*pb.Blocked
var blockedMap map[string]*pb.Blocked
records = make([]*record, 0, searchResult.TotalHits())
var r record var r record
for _, item := range searchResult.Each(reflect.TypeOf(r)) { for _, item := range searchResult.Each(reflect.TypeOf(r)) {
@ -288,6 +334,22 @@ func (s *Server) postProcessResults(
} }
} }
return records
}
func (s *Server) postProcessResults(
ctx context.Context,
client *elastic.Client,
records []*record,
in *pb.SearchRequest,
pageSize int,
from int,
searchIndices []string) ([]*pb.Output, []*pb.Output, []*pb.Blocked) {
var txos []*pb.Output
var blockedRecords []*record
var blocked []*pb.Blocked
var blockedMap map[string]*pb.Blocked
//printJsonFullResults(searchResult) //printJsonFullResults(searchResult)
records, blockedRecords, blockedMap = removeBlocked(records) records, blockedRecords, blockedMap = removeBlocked(records)
@ -353,17 +415,26 @@ func (s *Server) checkQuery(in *pb.SearchRequest) error {
for name, failed := range checks { for name, failed := range checks {
if failed { if failed {
time.Sleep(2) // throttle time.Sleep(2) // throttle
return fmt.Errorf("%s cant have more than %d items", name, limit) return fmt.Errorf("%s cant have more than %d items.", name, limit)
} }
} }
return nil return nil
} }
func setPageVars(in *pb.SearchRequest, pageSize *int, from *int) {
if in.Limit > 0 {
log.Printf("############ limit: %d\n", in.Limit)
*pageSize = int(in.Limit)
}
if in.Offset > 0 {
*from = int(in.Offset)
}
}
func (s *Server) setupEsQuery( func (s *Server) setupEsQuery(
q *elastic.BoolQuery, q *elastic.BoolQuery,
in *pb.SearchRequest, in *pb.SearchRequest,
pageSize *int,
from *int,
orderBy *[]orderField) *elastic.BoolQuery { orderBy *[]orderField) *elastic.BoolQuery {
claimTypes := map[string]int{ claimTypes := map[string]int{
"stream": 1, "stream": 1,
@ -418,14 +489,6 @@ func (s *Server) setupEsQuery(
q = q.Must(elastic.NewTermQuery("is_controlling", in.IsControlling)) q = q.Must(elastic.NewTermQuery("is_controlling", in.IsControlling))
} }
if in.Limit > 0 {
*pageSize = int(in.Limit)
}
if in.Offset > 0 {
*from = int(in.Offset)
}
if len(in.ClaimName) > 0 { if len(in.ClaimName) > 0 {
in.NormalizedName = util.NormalizeName(in.ClaimName) in.NormalizedName = util.NormalizeName(in.ClaimName)
} }
@ -617,7 +680,7 @@ func (s *Server) getUniqueChannels(records []*record, client *elastic.Client, ct
return channelTxos, channels return channelTxos, channels
} }
func (s * Server) getClaimsForReposts(ctx context.Context, client *elastic.Client, records []*record, searchIndices []string) ([]*pb.Output, []*record, map[string]*pb.Output) { func (s *Server) getClaimsForReposts(ctx context.Context, client *elastic.Client, records []*record, searchIndices []string) ([]*pb.Output, []*record, map[string]*pb.Output) {
var totalReposted = 0 var totalReposted = 0
var mget = client.Mget() //.StoredFields("_id") var mget = client.Mget() //.StoredFields("_id")
@ -669,6 +732,29 @@ func (s * Server) getClaimsForReposts(ctx context.Context, client *elastic.Clien
return claims, repostedRecords, respostedMap return claims, repostedRecords, respostedMap
} }
/*
Takes a search request and serializes into a string for use as a key into the
internal cache for the hub.
*/
func (s *Server) serializeSearchRequest(request *pb.SearchRequest) string {
// Save the offest / limit and set to zero, cache hits should happen regardless
// and they're used in post processing
//offset, limit := request.Offset, request.Limit
//request.Offset = 0
//request.Limit = 0
bytes, err := protojson.Marshal(request)
if err != nil {
return ""
}
str := string((*s.S256).Sum(bytes))
// log.Println(str)
//request.Offset = offset
//request.Limit = limit
return str
}
func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*record { func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*record {
finalHits := make([]*record, 0, len(searchHits)) finalHits := make([]*record, 0, len(searchHits))
var channelCounters map[string]int var channelCounters map[string]int
@ -803,3 +889,4 @@ func removeBlocked(searchHits []*record) ([]*record, []*record, map[string]*pb.B
return newHits, blockedHits, blockedChannels return newHits, blockedHits, blockedChannels
} }

View file

@ -2,14 +2,16 @@ package server
import ( import (
"context" "context"
"crypto/sha256"
"fmt" "fmt"
"hash"
"log" "log"
"net/http"
"os" "os"
"regexp" "regexp"
"net/http"
"time" "time"
"github.com/ReneKroon/ttlcache/v2"
"github.com/lbryio/hub/meta" "github.com/lbryio/hub/meta"
pb "github.com/lbryio/hub/protobuf/go" pb "github.com/lbryio/hub/protobuf/go"
"github.com/olivere/elastic/v7" "github.com/olivere/elastic/v7"
@ -18,12 +20,17 @@ import (
) )
type Server struct { type Server struct {
GrpcServer *grpc.Server GrpcServer *grpc.Server
Args *Args Args *Args
MultiSpaceRe *regexp.Regexp MultiSpaceRe *regexp.Regexp
WeirdCharsRe *regexp.Regexp WeirdCharsRe *regexp.Regexp
EsClient *elastic.Client EsClient *elastic.Client
Servers []*FederatedServer Servers []*FederatedServer
QueryCache *ttlcache.Cache
S256 *hash.Hash
LastRefreshCheck time.Time
RefreshDelta time.Duration
NumESRefreshes int64
pb.UnimplementedHubServer pb.UnimplementedHubServer
} }
@ -41,13 +48,15 @@ const (
type Args struct { type Args struct {
// TODO Make command types an enum // TODO Make command types an enum
CmdType int CmdType int
Host string Host string
Port string Port string
EsHost string EsHost string
EsPort string EsPort string
EsIndex string EsIndex string
Debug bool Debug bool
RefreshDelta int
CacheTTL int
} }
func getVersion() string { func getVersion() string {
@ -123,12 +132,29 @@ func MakeHubServer(args *Args) *Server {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
cache := ttlcache.NewCache()
err = cache.SetTTL(time.Duration(args.CacheTTL) * time.Minute)
if err != nil {
log.Fatal(err)
}
s256 := sha256.New()
var refreshDelta = time.Second * time.Duration(args.RefreshDelta)
if args.Debug {
refreshDelta = time.Second * 0
}
s := &Server{ s := &Server{
GrpcServer: grpcServer, GrpcServer: grpcServer,
Args: args, Args: args,
MultiSpaceRe: multiSpaceRe, MultiSpaceRe: multiSpaceRe,
WeirdCharsRe: weirdCharsRe, WeirdCharsRe: weirdCharsRe,
EsClient: client, EsClient: client,
QueryCache: cache,
S256: &s256,
LastRefreshCheck: time.Now(),
RefreshDelta: refreshDelta,
NumESRefreshes: 0,
} }
return s return s