commit
0d51cbfde4
8 changed files with 227 additions and 95 deletions
2
dev.sh
2
dev.sh
|
@ -3,4 +3,4 @@
|
|||
hash reflex 2>/dev/null || go get github.com/cespare/reflex
|
||||
hash reflex 2>/dev/null || { echo >&2 'Make sure '"$(go env GOPATH)"'/bin is in your $PATH'; exit 1; }
|
||||
|
||||
reflex --decoration=none --start-service=true -- sh -c "go run . serve --dev"
|
||||
reflex --decoration=none --start-service=true -- sh -c "go run . serve --debug"
|
||||
|
|
1
go.mod
1
go.mod
|
@ -3,6 +3,7 @@ module github.com/lbryio/hub
|
|||
go 1.16
|
||||
|
||||
require (
|
||||
github.com/ReneKroon/ttlcache/v2 v2.8.1
|
||||
github.com/akamensky/argparse v1.2.2
|
||||
github.com/btcsuite/btcutil v1.0.2
|
||||
github.com/lbryio/lbry.go/v2 v2.7.2-0.20210625145058-2b155597bf57
|
||||
|
|
12
go.sum
12
go.sum
|
@ -1,6 +1,8 @@
|
|||
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/ReneKroon/ttlcache/v2 v2.8.1 h1:0Exdyt5+vEsdRoFO1T7qDIYM3gq/ETbeYV+vjgcPxZk=
|
||||
github.com/ReneKroon/ttlcache/v2 v2.8.1/go.mod h1:mBxvsNY+BT8qLLd6CuAJubbKo6r0jh3nb5et22bbfGY=
|
||||
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
|
||||
github.com/akamensky/argparse v1.2.2 h1:P17T0ZjlUNJuWTPPJ2A5dM1wxarHgHqfYH+AZTo2xQA=
|
||||
github.com/akamensky/argparse v1.2.2/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA=
|
||||
|
@ -196,6 +198,8 @@ github.com/ybbus/jsonrpc v0.0.0-20180411222309-2a548b7d822d/go.mod h1:XJrh1eMSzd
|
|||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
|
||||
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
||||
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
|
||||
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
|
||||
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
|
@ -207,6 +211,8 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL
|
|||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
|
||||
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 h1:2M3HP5CCK1Si9FQhwnzYhXdG6DXeebvUHFpre8QvbyI=
|
||||
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
|
||||
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
|
@ -236,6 +242,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
|
|||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
@ -271,8 +279,11 @@ golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3
|
|||
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
|
||||
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
|
||||
golang.org/x/tools v0.0.0-20210112230658-8b4aab62c064/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY=
|
||||
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
@ -310,6 +321,7 @@ google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/l
|
|||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
|
||||
gopkg.in/ini.v1 v1.48.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
|
||||
|
|
30
main.go
30
main.go
|
@ -18,11 +18,13 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
defaultHost = "0.0.0.0"
|
||||
defaultPort = "50051"
|
||||
defaultEsHost = "http://localhost"
|
||||
defaultEsIndex = "claims"
|
||||
defaultEsPort = "9200"
|
||||
defaultHost = "0.0.0.0"
|
||||
defaultPort = "50051"
|
||||
defaultEsHost = "http://localhost"
|
||||
defaultEsIndex = "claims"
|
||||
defaultEsPort = "9200"
|
||||
defaultRefreshDelta = 5
|
||||
defaultCacheTTL = 5
|
||||
)
|
||||
|
||||
func GetEnvironment(data []string, getkeyval func(item string) (key, val string)) map[string]string {
|
||||
|
@ -71,6 +73,8 @@ func parseArgs(searchRequest *pb.SearchRequest) *server.Args {
|
|||
esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: defaultEsHost})
|
||||
esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: defaultEsPort})
|
||||
esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: defaultEsIndex})
|
||||
refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: defaultRefreshDelta})
|
||||
cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: defaultCacheTTL})
|
||||
|
||||
text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"})
|
||||
name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"})
|
||||
|
@ -89,13 +93,15 @@ func parseArgs(searchRequest *pb.SearchRequest) *server.Args {
|
|||
}
|
||||
|
||||
args := &server.Args{
|
||||
CmdType: server.SearchCmd,
|
||||
Host: *host,
|
||||
Port: ":" + *port,
|
||||
EsHost: *esHost,
|
||||
EsPort: *esPort,
|
||||
EsIndex: *esIndex,
|
||||
Debug: *debug,
|
||||
CmdType: server.SearchCmd,
|
||||
Host: *host,
|
||||
Port: ":" + *port,
|
||||
EsHost: *esHost,
|
||||
EsPort: *esPort,
|
||||
EsIndex: *esIndex,
|
||||
Debug: *debug,
|
||||
RefreshDelta: *refreshDelta,
|
||||
CacheTTL: *cacheTTL,
|
||||
}
|
||||
|
||||
if esHost, ok := environment["ELASTIC_HOST"]; ok {
|
||||
|
|
|
@ -48,7 +48,7 @@ message SearchRequest {
|
|||
InvertibleField claim_id = 1;
|
||||
InvertibleField channel_id = 2;
|
||||
string text = 3;
|
||||
uint32 limit = 4;
|
||||
int32 limit = 4;
|
||||
repeated string order_by = 5;
|
||||
uint32 offset = 6;
|
||||
bool is_controlling = 7;
|
||||
|
@ -95,7 +95,7 @@ message SearchRequest {
|
|||
repeated string not_tags = 51;
|
||||
bool has_channel_signature = 52;
|
||||
BoolValue has_source = 53;
|
||||
uint32 limit_claims_per_channel = 54;
|
||||
int32 limit_claims_per_channel = 54;
|
||||
repeated string any_languages = 55;
|
||||
repeated string all_languages = 56;
|
||||
bool remove_duplicates = 57;
|
||||
|
|
|
@ -372,7 +372,7 @@ type SearchRequest struct {
|
|||
ClaimId *InvertibleField `protobuf:"bytes,1,opt,name=claim_id,json=claimId,proto3" json:"claim_id"`
|
||||
ChannelId *InvertibleField `protobuf:"bytes,2,opt,name=channel_id,json=channelId,proto3" json:"channel_id"`
|
||||
Text string `protobuf:"bytes,3,opt,name=text,proto3" json:"text"`
|
||||
Limit uint32 `protobuf:"varint,4,opt,name=limit,proto3" json:"limit"`
|
||||
Limit int32 `protobuf:"varint,4,opt,name=limit,proto3" json:"limit"`
|
||||
OrderBy []string `protobuf:"bytes,5,rep,name=order_by,json=orderBy,proto3" json:"order_by"`
|
||||
Offset uint32 `protobuf:"varint,6,opt,name=offset,proto3" json:"offset"`
|
||||
IsControlling bool `protobuf:"varint,7,opt,name=is_controlling,json=isControlling,proto3" json:"is_controlling"`
|
||||
|
@ -419,7 +419,7 @@ type SearchRequest struct {
|
|||
NotTags []string `protobuf:"bytes,51,rep,name=not_tags,json=notTags,proto3" json:"not_tags"`
|
||||
HasChannelSignature bool `protobuf:"varint,52,opt,name=has_channel_signature,json=hasChannelSignature,proto3" json:"has_channel_signature"`
|
||||
HasSource *BoolValue `protobuf:"bytes,53,opt,name=has_source,json=hasSource,proto3" json:"has_source"`
|
||||
LimitClaimsPerChannel uint32 `protobuf:"varint,54,opt,name=limit_claims_per_channel,json=limitClaimsPerChannel,proto3" json:"limit_claims_per_channel"`
|
||||
LimitClaimsPerChannel int32 `protobuf:"varint,54,opt,name=limit_claims_per_channel,json=limitClaimsPerChannel,proto3" json:"limit_claims_per_channel"`
|
||||
AnyLanguages []string `protobuf:"bytes,55,rep,name=any_languages,json=anyLanguages,proto3" json:"any_languages"`
|
||||
AllLanguages []string `protobuf:"bytes,56,rep,name=all_languages,json=allLanguages,proto3" json:"all_languages"`
|
||||
RemoveDuplicates bool `protobuf:"varint,57,opt,name=remove_duplicates,json=removeDuplicates,proto3" json:"remove_duplicates"`
|
||||
|
@ -479,7 +479,7 @@ func (x *SearchRequest) GetText() string {
|
|||
return ""
|
||||
}
|
||||
|
||||
func (x *SearchRequest) GetLimit() uint32 {
|
||||
func (x *SearchRequest) GetLimit() int32 {
|
||||
if x != nil {
|
||||
return x.Limit
|
||||
}
|
||||
|
@ -808,7 +808,7 @@ func (x *SearchRequest) GetHasSource() *BoolValue {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (x *SearchRequest) GetLimitClaimsPerChannel() uint32 {
|
||||
func (x *SearchRequest) GetLimitClaimsPerChannel() int32 {
|
||||
if x != nil {
|
||||
return x.LimitClaimsPerChannel
|
||||
}
|
||||
|
@ -876,7 +876,7 @@ var file_hub_proto_rawDesc = []byte{
|
|||
0x76, 0x65, 0x72, 0x74, 0x69, 0x62, 0x6c, 0x65, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x09, 0x63,
|
||||
0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x78, 0x74,
|
||||
0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x65, 0x78, 0x74, 0x12, 0x14, 0x0a, 0x05,
|
||||
0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x6c, 0x69, 0x6d,
|
||||
0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6c, 0x69, 0x6d,
|
||||
0x69, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x5f, 0x62, 0x79, 0x18, 0x05,
|
||||
0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x42, 0x79, 0x12, 0x16, 0x0a,
|
||||
0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x6f,
|
||||
|
@ -998,7 +998,7 @@ var file_hub_proto_rawDesc = []byte{
|
|||
0x2e, 0x70, 0x62, 0x2e, 0x42, 0x6f, 0x6f, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x09, 0x68,
|
||||
0x61, 0x73, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x37, 0x0a, 0x18, 0x6c, 0x69, 0x6d, 0x69,
|
||||
0x74, 0x5f, 0x63, 0x6c, 0x61, 0x69, 0x6d, 0x73, 0x5f, 0x70, 0x65, 0x72, 0x5f, 0x63, 0x68, 0x61,
|
||||
0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x36, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x15, 0x6c, 0x69, 0x6d, 0x69,
|
||||
0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x36, 0x20, 0x01, 0x28, 0x05, 0x52, 0x15, 0x6c, 0x69, 0x6d, 0x69,
|
||||
0x74, 0x43, 0x6c, 0x61, 0x69, 0x6d, 0x73, 0x50, 0x65, 0x72, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65,
|
||||
0x6c, 0x12, 0x23, 0x0a, 0x0d, 0x61, 0x6e, 0x79, 0x5f, 0x6c, 0x61, 0x6e, 0x67, 0x75, 0x61, 0x67,
|
||||
0x65, 0x73, 0x18, 0x37, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x61, 0x6e, 0x79, 0x4c, 0x61, 0x6e,
|
||||
|
|
195
server/search.go
195
server/search.go
|
@ -18,6 +18,7 @@ import (
|
|||
"github.com/olivere/elastic/v7"
|
||||
"golang.org/x/text/cases"
|
||||
"golang.org/x/text/language"
|
||||
"google.golang.org/protobuf/encoding/protojson"
|
||||
"gopkg.in/karalabe/cookiejar.v1/collections/deque"
|
||||
)
|
||||
|
||||
|
@ -179,43 +180,100 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
|
|||
var pageSize = 10
|
||||
var orderBy []orderField
|
||||
var searchIndices []string
|
||||
var searchResult *elastic.SearchResult = nil
|
||||
client := s.EsClient
|
||||
searchIndices = make([]string, 0, 1)
|
||||
searchIndices = append(searchIndices, s.Args.EsIndex)
|
||||
|
||||
q := elastic.NewBoolQuery()
|
||||
//Code for debugging locally
|
||||
//indices, _ := client.IndexNames()
|
||||
//for _, index := range indices {
|
||||
// if index != "claims" {
|
||||
// log.Println(index)
|
||||
// searchIndices = append(searchIndices, index)
|
||||
// }
|
||||
//}
|
||||
|
||||
err := s.checkQuery(in)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
q = s.setupEsQuery(q, in, &pageSize, &from, &orderBy)
|
||||
|
||||
fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title")
|
||||
search := client.Search().
|
||||
Index(searchIndices...).
|
||||
FetchSourceContext(fsc).
|
||||
Query(q). // specify the query
|
||||
From(0).Size(DefaultSearchSize)
|
||||
|
||||
for _, x := range orderBy {
|
||||
search = search.Sort(x.Field, x.IsAsc)
|
||||
// If it's been more than RefreshDelta time since we last checked if the
|
||||
// es index has been refreshed, we check (this is 2 seconds in prod,
|
||||
// 0 seconds in debug / unit testing). If the index has been refreshed
|
||||
// a different number of times since we last checked, we purge the cache
|
||||
if time.Now().After(s.LastRefreshCheck.Add(s.RefreshDelta)) {
|
||||
res, err := client.IndexStats(searchIndices[0]).Do(ctx)
|
||||
if err != nil {
|
||||
log.Printf("Error on ES index stats\n%v\n", err)
|
||||
}
|
||||
numRefreshes := res.Indices[searchIndices[0]].Primaries.Refresh.Total
|
||||
if numRefreshes != s.NumESRefreshes {
|
||||
_ = s.QueryCache.Purge()
|
||||
s.NumESRefreshes = numRefreshes
|
||||
}
|
||||
}
|
||||
|
||||
searchResult, err := search.Do(ctx) // execute
|
||||
if err != nil && elastic.IsNotFound(err) {
|
||||
log.Println("Index returned 404! Check writer. Index: ", searchIndices)
|
||||
return &pb.Outputs{}, nil
|
||||
var records []*record
|
||||
|
||||
} else if err != nil {
|
||||
s.recordErrorAndReturn(err, "search_errors")
|
||||
log.Println("Error executing query: ", err)
|
||||
return nil, err
|
||||
cacheKey := s.serializeSearchRequest(in)
|
||||
|
||||
setPageVars(in, &pageSize, &from)
|
||||
|
||||
/*
|
||||
cache based on search request params
|
||||
include from value and number of results.
|
||||
When another search request comes in with same search params
|
||||
and same or increased offset (which we currently don't even use?)
|
||||
that will be a cache hit.
|
||||
FIXME: For now the cache is turned off when in debugging mode
|
||||
(for unit tests) because it breaks on some of them.
|
||||
FIXME: Currently the cache just skips the initial search,
|
||||
the mgets and post processing are still done. There's probably
|
||||
a more efficient way to store the final result.
|
||||
*/
|
||||
|
||||
if val, err := s.QueryCache.Get(cacheKey); err != nil {
|
||||
|
||||
q := elastic.NewBoolQuery()
|
||||
|
||||
err := s.checkQuery(in)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
q = s.setupEsQuery(q, in, &orderBy)
|
||||
|
||||
fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title")
|
||||
search := client.Search().
|
||||
Index(searchIndices...).
|
||||
FetchSourceContext(fsc).
|
||||
Query(q). // specify the query
|
||||
From(0).Size(DefaultSearchSize)
|
||||
|
||||
for _, x := range orderBy {
|
||||
search = search.Sort(x.Field, x.IsAsc)
|
||||
}
|
||||
|
||||
searchResult, err = search.Do(ctx) // execute
|
||||
if err != nil && elastic.IsNotFound(err) {
|
||||
log.Println("Index returned 404! Check writer. Index: ", searchIndices)
|
||||
return &pb.Outputs{}, nil
|
||||
|
||||
} else if err != nil {
|
||||
s.recordErrorAndReturn(err, "search_errors")
|
||||
log.Println("Error executing query: ", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis)
|
||||
|
||||
records = s.searchResultToRecords(searchResult)
|
||||
err = s.QueryCache.Set(cacheKey, records)
|
||||
if err != nil {
|
||||
//FIXME: Should this be fatal?
|
||||
log.Println("Error storing records in cache: ", err)
|
||||
}
|
||||
} else {
|
||||
records = val.([]*record)
|
||||
}
|
||||
|
||||
log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis)
|
||||
|
||||
txos, extraTxos, blocked := s.postProcessResults(ctx, client, searchResult, in, pageSize, from, searchIndices)
|
||||
txos, extraTxos, blocked := s.postProcessResults(ctx, client, records, in, pageSize, from, searchIndices)
|
||||
|
||||
t1 := time.Now()
|
||||
|
||||
|
@ -265,21 +323,9 @@ func (s *Server) cleanTags(tags []string) []string {
|
|||
return cleanedTags
|
||||
}
|
||||
|
||||
func (s *Server) postProcessResults(
|
||||
ctx context.Context,
|
||||
client *elastic.Client,
|
||||
searchResult *elastic.SearchResult,
|
||||
in *pb.SearchRequest,
|
||||
pageSize int,
|
||||
from int,
|
||||
searchIndices []string) ([]*pb.Output, []*pb.Output, []*pb.Blocked) {
|
||||
var txos []*pb.Output
|
||||
var records []*record
|
||||
var blockedRecords []*record
|
||||
var blocked []*pb.Blocked
|
||||
var blockedMap map[string]*pb.Blocked
|
||||
|
||||
records = make([]*record, 0, searchResult.TotalHits())
|
||||
func (s *Server) searchResultToRecords(
|
||||
searchResult *elastic.SearchResult) []*record {
|
||||
records := make([]*record, 0, searchResult.TotalHits())
|
||||
|
||||
var r record
|
||||
for _, item := range searchResult.Each(reflect.TypeOf(r)) {
|
||||
|
@ -288,6 +334,22 @@ func (s *Server) postProcessResults(
|
|||
}
|
||||
}
|
||||
|
||||
return records
|
||||
}
|
||||
|
||||
func (s *Server) postProcessResults(
|
||||
ctx context.Context,
|
||||
client *elastic.Client,
|
||||
records []*record,
|
||||
in *pb.SearchRequest,
|
||||
pageSize int,
|
||||
from int,
|
||||
searchIndices []string) ([]*pb.Output, []*pb.Output, []*pb.Blocked) {
|
||||
var txos []*pb.Output
|
||||
var blockedRecords []*record
|
||||
var blocked []*pb.Blocked
|
||||
var blockedMap map[string]*pb.Blocked
|
||||
|
||||
//printJsonFullResults(searchResult)
|
||||
records, blockedRecords, blockedMap = removeBlocked(records)
|
||||
|
||||
|
@ -353,17 +415,26 @@ func (s *Server) checkQuery(in *pb.SearchRequest) error {
|
|||
for name, failed := range checks {
|
||||
if failed {
|
||||
time.Sleep(2) // throttle
|
||||
return fmt.Errorf("%s cant have more than %d items", name, limit)
|
||||
return fmt.Errorf("%s cant have more than %d items.", name, limit)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func setPageVars(in *pb.SearchRequest, pageSize *int, from *int) {
|
||||
if in.Limit > 0 {
|
||||
log.Printf("############ limit: %d\n", in.Limit)
|
||||
*pageSize = int(in.Limit)
|
||||
}
|
||||
|
||||
if in.Offset > 0 {
|
||||
*from = int(in.Offset)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) setupEsQuery(
|
||||
q *elastic.BoolQuery,
|
||||
in *pb.SearchRequest,
|
||||
pageSize *int,
|
||||
from *int,
|
||||
orderBy *[]orderField) *elastic.BoolQuery {
|
||||
claimTypes := map[string]int{
|
||||
"stream": 1,
|
||||
|
@ -418,14 +489,6 @@ func (s *Server) setupEsQuery(
|
|||
q = q.Must(elastic.NewTermQuery("is_controlling", in.IsControlling))
|
||||
}
|
||||
|
||||
if in.Limit > 0 {
|
||||
*pageSize = int(in.Limit)
|
||||
}
|
||||
|
||||
if in.Offset > 0 {
|
||||
*from = int(in.Offset)
|
||||
}
|
||||
|
||||
if len(in.ClaimName) > 0 {
|
||||
in.NormalizedName = util.NormalizeName(in.ClaimName)
|
||||
}
|
||||
|
@ -617,7 +680,7 @@ func (s *Server) getUniqueChannels(records []*record, client *elastic.Client, ct
|
|||
return channelTxos, channels
|
||||
}
|
||||
|
||||
func (s * Server) getClaimsForReposts(ctx context.Context, client *elastic.Client, records []*record, searchIndices []string) ([]*pb.Output, []*record, map[string]*pb.Output) {
|
||||
func (s *Server) getClaimsForReposts(ctx context.Context, client *elastic.Client, records []*record, searchIndices []string) ([]*pb.Output, []*record, map[string]*pb.Output) {
|
||||
|
||||
var totalReposted = 0
|
||||
var mget = client.Mget() //.StoredFields("_id")
|
||||
|
@ -669,6 +732,29 @@ func (s * Server) getClaimsForReposts(ctx context.Context, client *elastic.Clien
|
|||
return claims, repostedRecords, respostedMap
|
||||
}
|
||||
|
||||
/*
|
||||
Takes a search request and serializes into a string for use as a key into the
|
||||
internal cache for the hub.
|
||||
*/
|
||||
func (s *Server) serializeSearchRequest(request *pb.SearchRequest) string {
|
||||
// Save the offest / limit and set to zero, cache hits should happen regardless
|
||||
// and they're used in post processing
|
||||
//offset, limit := request.Offset, request.Limit
|
||||
//request.Offset = 0
|
||||
//request.Limit = 0
|
||||
|
||||
bytes, err := protojson.Marshal(request)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
str := string((*s.S256).Sum(bytes))
|
||||
// log.Println(str)
|
||||
//request.Offset = offset
|
||||
//request.Limit = limit
|
||||
|
||||
return str
|
||||
}
|
||||
|
||||
func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*record {
|
||||
finalHits := make([]*record, 0, len(searchHits))
|
||||
var channelCounters map[string]int
|
||||
|
@ -803,3 +889,4 @@ func removeBlocked(searchHits []*record) ([]*record, []*record, map[string]*pb.B
|
|||
|
||||
return newHits, blockedHits, blockedChannels
|
||||
}
|
||||
|
||||
|
|
|
@ -2,14 +2,16 @@ package server
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"hash"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"regexp"
|
||||
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/ReneKroon/ttlcache/v2"
|
||||
"github.com/lbryio/hub/meta"
|
||||
pb "github.com/lbryio/hub/protobuf/go"
|
||||
"github.com/olivere/elastic/v7"
|
||||
|
@ -18,12 +20,17 @@ import (
|
|||
)
|
||||
|
||||
type Server struct {
|
||||
GrpcServer *grpc.Server
|
||||
Args *Args
|
||||
MultiSpaceRe *regexp.Regexp
|
||||
WeirdCharsRe *regexp.Regexp
|
||||
EsClient *elastic.Client
|
||||
Servers []*FederatedServer
|
||||
GrpcServer *grpc.Server
|
||||
Args *Args
|
||||
MultiSpaceRe *regexp.Regexp
|
||||
WeirdCharsRe *regexp.Regexp
|
||||
EsClient *elastic.Client
|
||||
Servers []*FederatedServer
|
||||
QueryCache *ttlcache.Cache
|
||||
S256 *hash.Hash
|
||||
LastRefreshCheck time.Time
|
||||
RefreshDelta time.Duration
|
||||
NumESRefreshes int64
|
||||
pb.UnimplementedHubServer
|
||||
}
|
||||
|
||||
|
@ -41,13 +48,15 @@ const (
|
|||
|
||||
type Args struct {
|
||||
// TODO Make command types an enum
|
||||
CmdType int
|
||||
Host string
|
||||
Port string
|
||||
EsHost string
|
||||
EsPort string
|
||||
EsIndex string
|
||||
Debug bool
|
||||
CmdType int
|
||||
Host string
|
||||
Port string
|
||||
EsHost string
|
||||
EsPort string
|
||||
EsIndex string
|
||||
Debug bool
|
||||
RefreshDelta int
|
||||
CacheTTL int
|
||||
}
|
||||
|
||||
func getVersion() string {
|
||||
|
@ -123,12 +132,29 @@ func MakeHubServer(args *Args) *Server {
|
|||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
cache := ttlcache.NewCache()
|
||||
err = cache.SetTTL(time.Duration(args.CacheTTL) * time.Minute)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
s256 := sha256.New()
|
||||
var refreshDelta = time.Second * time.Duration(args.RefreshDelta)
|
||||
if args.Debug {
|
||||
refreshDelta = time.Second * 0
|
||||
}
|
||||
|
||||
s := &Server{
|
||||
GrpcServer: grpcServer,
|
||||
Args: args,
|
||||
MultiSpaceRe: multiSpaceRe,
|
||||
WeirdCharsRe: weirdCharsRe,
|
||||
EsClient: client,
|
||||
GrpcServer: grpcServer,
|
||||
Args: args,
|
||||
MultiSpaceRe: multiSpaceRe,
|
||||
WeirdCharsRe: weirdCharsRe,
|
||||
EsClient: client,
|
||||
QueryCache: cache,
|
||||
S256: &s256,
|
||||
LastRefreshCheck: time.Now(),
|
||||
RefreshDelta: refreshDelta,
|
||||
NumESRefreshes: 0,
|
||||
}
|
||||
|
||||
return s
|
||||
|
|
Loading…
Add table
Reference in a new issue