@ -3,4 +3,4 @@
hash reflex 2>/dev/null || go get github.com/cespare/reflex
hash reflex 2>/dev/null || { echo >&2 'Make sure '"$(go env GOPATH)"'/bin is in your $PATH'; exit 1; }
reflex --decoration=none --start-service=true -- sh -c "go run . serve --dev"
reflex --decoration=none --start-service=true -- sh -c "go run . serve --debug"
@ -3,6 +3,7 @@ module github.com/lbryio/hub
go 1.16
require (
github.com/ReneKroon/ttlcache/v2 v2.8.1
github.com/akamensky/argparse v1.2.2
github.com/btcsuite/btcutil v1.0.2
github.com/lbryio/lbry.go/v2 v2.7.2-0.20210625145058-2b155597bf57
@ -18,11 +18,13 @@ import (
const (
defaultHost = ""
defaultPort = "50051"
defaultEsHost = "http://localhost"
defaultEsIndex = "claims"
defaultEsPort = "9200"
defaultHost = ""
defaultPort = "50051"
defaultEsHost = "http://localhost"
defaultEsIndex = "claims"
defaultEsPort = "9200"
defaultRefreshDelta = 5
defaultCacheTTL = 5
func GetEnvironment(data []string, getkeyval func(item string) (key, val string)) map[string]string {
@ -71,6 +73,8 @@ func parseArgs(searchRequest *pb.SearchRequest) *server.Args {
esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: defaultEsHost})
esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: defaultEsPort})
esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: defaultEsIndex})
refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: defaultRefreshDelta})
cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: defaultCacheTTL})
text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"})
name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"})
@ -89,13 +93,15 @@ func parseArgs(searchRequest *pb.SearchRequest) *server.Args {
args := &server.Args{
CmdType: server.SearchCmd,
Host: *host,
Port: ":" + *port,
EsHost: *esHost,
EsPort: *esPort,
EsIndex: *esIndex,
Debug: *debug,
CmdType: server.SearchCmd,
Host: *host,
Port: ":" + *port,
EsHost: *esHost,
EsPort: *esPort,
EsIndex: *esIndex,
Debug: *debug,
RefreshDelta: *refreshDelta,
CacheTTL: *cacheTTL,
if esHost, ok := environment["ELASTIC_HOST"]; ok {
@ -48,7 +48,7 @@ message SearchRequest {
InvertibleField claim_id = 1;
InvertibleField channel_id = 2;
string text = 3;
uint32 limit = 4;
int32 limit = 4;
repeated string order_by = 5;
uint32 offset = 6;
bool is_controlling = 7;
@ -95,7 +95,7 @@ message SearchRequest {
repeated string not_tags = 51;
bool has_channel_signature = 52;
BoolValue has_source = 53;
uint32 limit_claims_per_channel = 54;
int32 limit_claims_per_channel = 54;
repeated string any_languages = 55;
repeated string all_languages = 56;
bool remove_duplicates = 57;
@ -372,7 +372,7 @@ type SearchRequest struct {
ClaimId *InvertibleField `protobuf:"bytes,1,opt,name=claim_id,json=claimId,proto3" json:"claim_id"`
ChannelId *InvertibleField `protobuf:"bytes,2,opt,name=channel_id,json=channelId,proto3" json:"channel_id"`
Text string `protobuf:"bytes,3,opt,name=text,proto3" json:"text"`
Limit uint32 `protobuf:"varint,4,opt,name=limit,proto3" json:"limit"`
Limit int32 `protobuf:"varint,4,opt,name=limit,proto3" json:"limit"`
OrderBy []string `protobuf:"bytes,5,rep,name=order_by,json=orderBy,proto3" json:"order_by"`
Offset uint32 `protobuf:"varint,6,opt,name=offset,proto3" json:"offset"`
IsControlling bool `protobuf:"varint,7,opt,name=is_controlling,json=isControlling,proto3" json:"is_controlling"`
@ -419,7 +419,7 @@ type SearchRequest struct {
NotTags []string `protobuf:"bytes,51,rep,name=not_tags,json=notTags,proto3" json:"not_tags"`
HasChannelSignature bool `protobuf:"varint,52,opt,name=has_channel_signature,json=hasChannelSignature,proto3" json:"has_channel_signature"`
HasSource *BoolValue `protobuf:"bytes,53,opt,name=has_source,json=hasSource,proto3" json:"has_source"`
LimitClaimsPerChannel uint32 `protobuf:"varint,54,opt,name=limit_claims_per_channel,json=limitClaimsPerChannel,proto3" json:"limit_claims_per_channel"`
LimitClaimsPerChannel int32 `protobuf:"varint,54,opt,name=limit_claims_per_channel,json=limitClaimsPerChannel,proto3" json:"limit_claims_per_channel"`
AnyLanguages []string `protobuf:"bytes,55,rep,name=any_languages,json=anyLanguages,proto3" json:"any_languages"`
AllLanguages []string `protobuf:"bytes,56,rep,name=all_languages,json=allLanguages,proto3" json:"all_languages"`
RemoveDuplicates bool `protobuf:"varint,57,opt,name=remove_duplicates,json=removeDuplicates,proto3" json:"remove_duplicates"`
@ -479,7 +479,7 @@ func (x *SearchRequest) GetText() string {
return ""
func (x *SearchRequest) GetLimit() uint32 {
func (x *SearchRequest) GetLimit() int32 {
if x != nil {
return x.Limit
@ -808,7 +808,7 @@ func (x *SearchRequest) GetHasSource() *BoolValue {
return nil
func (x *SearchRequest) GetLimitClaimsPerChannel() uint32 {
func (x *SearchRequest) GetLimitClaimsPerChannel() int32 {
if x != nil {
return x.LimitClaimsPerChannel
@ -876,7 +876,7 @@ var file_hub_proto_rawDesc = []byte{
0x76, 0x65, 0x72, 0x74, 0x69, 0x62, 0x6c, 0x65, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x09, 0x63,
0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x78, 0x74,
0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x65, 0x78, 0x74, 0x12, 0x14, 0x0a, 0x05,
0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x6c, 0x69, 0x6d,
0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6c, 0x69, 0x6d,
0x69, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x5f, 0x62, 0x79, 0x18, 0x05,
0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x42, 0x79, 0x12, 0x16, 0x0a,
0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x6f,
@ -998,7 +998,7 @@ var file_hub_proto_rawDesc = []byte{
0x2e, 0x70, 0x62, 0x2e, 0x42, 0x6f, 0x6f, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x09, 0x68,
0x61, 0x73, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x37, 0x0a, 0x18, 0x6c, 0x69, 0x6d, 0x69,
0x74, 0x5f, 0x63, 0x6c, 0x61, 0x69, 0x6d, 0x73, 0x5f, 0x70, 0x65, 0x72, 0x5f, 0x63, 0x68, 0x61,
0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x36, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x15, 0x6c, 0x69, 0x6d, 0x69,
0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x36, 0x20, 0x01, 0x28, 0x05, 0x52, 0x15, 0x6c, 0x69, 0x6d, 0x69,
0x74, 0x43, 0x6c, 0x61, 0x69, 0x6d, 0x73, 0x50, 0x65, 0x72, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65,
0x6c, 0x12, 0x23, 0x0a, 0x0d, 0x61, 0x6e, 0x79, 0x5f, 0x6c, 0x61, 0x6e, 0x67, 0x75, 0x61, 0x67,
0x65, 0x73, 0x18, 0x37, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x61, 0x6e, 0x79, 0x4c, 0x61, 0x6e,
@ -18,6 +18,7 @@ import (
@ -179,43 +180,100 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
var pageSize = 10
var orderBy []orderField
var searchIndices []string
var searchResult *elastic.SearchResult = nil
client := s.EsClient
searchIndices = make([]string, 0, 1)
searchIndices = append(searchIndices, s.Args.EsIndex)
q := elastic.NewBoolQuery()
//Code for debugging locally
//indices, _ := client.IndexNames()
//for _, index := range indices {
// if index != "claims" {
// log.Println(index)
// searchIndices = append(searchIndices, index)
// }
err := s.checkQuery(in)
if err != nil {
return nil, err
q = s.setupEsQuery(q, in, &pageSize, &from, &orderBy)
fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title")
search := client.Search().
Query(q). // specify the query
for _, x := range orderBy {
search = search.Sort(x.Field, x.IsAsc)
// If it's been more than RefreshDelta time since we last checked if the
// es index has been refreshed, we check (this is 2 seconds in prod,
// 0 seconds in debug / unit testing). If the index has been refreshed
// a different number of times since we last checked, we purge the cache
if time.Now().After(s.LastRefreshCheck.Add(s.RefreshDelta)) {
res, err := client.IndexStats(searchIndices[0]).Do(ctx)
if err != nil {
log.Printf("Error on ES index stats\n%v\n", err)
numRefreshes := res.Indices[searchIndices[0]].Primaries.Refresh.Total
if numRefreshes != s.NumESRefreshes {
_ = s.QueryCache.Purge()
s.NumESRefreshes = numRefreshes
searchResult, err := search.Do(ctx) // execute
if err != nil && elastic.IsNotFound(err) {
log.Println("Index returned 404! Check writer. Index: ", searchIndices)
return &pb.Outputs{}, nil
var records []*record
} else if err != nil {
s.recordErrorAndReturn(err, "search_errors")
log.Println("Error executing query: ", err)
return nil, err
cacheKey := s.serializeSearchRequest(in)
setPageVars(in, &pageSize, &from)
cache based on search request params
include from value and number of results.
When another search request comes in with same search params
and same or increased offset (which we currently don't even use?)
that will be a cache hit.
FIXME: For now the cache is turned off when in debugging mode
(for unit tests) because it breaks on some of them.
FIXME: Currently the cache just skips the initial search,
the mgets and post processing are still done. There's probably
a more efficient way to store the final result.
if val, err := s.QueryCache.Get(cacheKey); err != nil {
q := elastic.NewBoolQuery()
err := s.checkQuery(in)
if err != nil {
return nil, err
q = s.setupEsQuery(q, in, &orderBy)
fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title")
search := client.Search().
Query(q). // specify the query
for _, x := range orderBy {
search = search.Sort(x.Field, x.IsAsc)
searchResult, err = search.Do(ctx) // execute
if err != nil && elastic.IsNotFound(err) {
log.Println("Index returned 404! Check writer. Index: ", searchIndices)
return &pb.Outputs{}, nil
} else if err != nil {
s.recordErrorAndReturn(err, "search_errors")
log.Println("Error executing query: ", err)
return nil, err
log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis)
records = s.searchResultToRecords(searchResult)
err = s.QueryCache.Set(cacheKey, records)
if err != nil {
//FIXME: Should this be fatal?
log.Println("Error storing records in cache: ", err)
} else {
records = val.([]*record)
log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis)
txos, extraTxos, blocked := s.postProcessResults(ctx, client, searchResult, in, pageSize, from, searchIndices)
txos, extraTxos, blocked := s.postProcessResults(ctx, client, records, in, pageSize, from, searchIndices)
t1 := time.Now()
@ -265,21 +323,9 @@ func (s *Server) cleanTags(tags []string) []string {
return cleanedTags
func (s *Server) postProcessResults(
ctx context.Context,
client *elastic.Client,
searchResult *elastic.SearchResult,
in *pb.SearchRequest,
pageSize int,
from int,
searchIndices []string) ([]*pb.Output, []*pb.Output, []*pb.Blocked) {
var txos []*pb.Output
var records []*record
var blockedRecords []*record
var blocked []*pb.Blocked
var blockedMap map[string]*pb.Blocked
records = make([]*record, 0, searchResult.TotalHits())
func (s *Server) searchResultToRecords(
searchResult *elastic.SearchResult) []*record {
records := make([]*record, 0, searchResult.TotalHits())
var r record
for _, item := range searchResult.Each(reflect.TypeOf(r)) {
@ -288,6 +334,22 @@ func (s *Server) postProcessResults(
return records
func (s *Server) postProcessResults(
ctx context.Context,
client *elastic.Client,
records []*record,
in *pb.SearchRequest,
pageSize int,
from int,
searchIndices []string) ([]*pb.Output, []*pb.Output, []*pb.Blocked) {
var txos []*pb.Output
var blockedRecords []*record
var blocked []*pb.Blocked
var blockedMap map[string]*pb.Blocked
records, blockedRecords, blockedMap = removeBlocked(records)
@ -353,17 +415,26 @@ func (s *Server) checkQuery(in *pb.SearchRequest) error {
for name, failed := range checks {
if failed {
time.Sleep(2) // throttle
return fmt.Errorf("%s cant have more than %d items", name, limit)
return fmt.Errorf("%s cant have more than %d items.", name, limit)
return nil
func setPageVars(in *pb.SearchRequest, pageSize *int, from *int) {
if in.Limit > 0 {
log.Printf("############ limit: %d\n", in.Limit)
*pageSize = int(in.Limit)
if in.Offset > 0 {
*from = int(in.Offset)
func (s *Server) setupEsQuery(
q *elastic.BoolQuery,
in *pb.SearchRequest,
pageSize *int,
from *int,
orderBy *[]orderField) *elastic.BoolQuery {
claimTypes := map[string]int{
"stream": 1,
@ -418,14 +489,6 @@ func (s *Server) setupEsQuery(
q = q.Must(elastic.NewTermQuery("is_controlling", in.IsControlling))
if in.Limit > 0 {
*pageSize = int(in.Limit)
if in.Offset > 0 {
*from = int(in.Offset)
if len(in.ClaimName) > 0 {
in.NormalizedName = util.NormalizeName(in.ClaimName)
@ -617,7 +680,7 @@ func (s *Server) getUniqueChannels(records []*record, client *elastic.Client, ct
return channelTxos, channels
func (s * Server) getClaimsForReposts(ctx context.Context, client *elastic.Client, records []*record, searchIndices []string) ([]*pb.Output, []*record, map[string]*pb.Output) {
func (s *Server) getClaimsForReposts(ctx context.Context, client *elastic.Client, records []*record, searchIndices []string) ([]*pb.Output, []*record, map[string]*pb.Output) {
var totalReposted = 0
var mget = client.Mget() //.StoredFields("_id")
@ -669,6 +732,29 @@ func (s * Server) getClaimsForReposts(ctx context.Context, client *elastic.Clien
return claims, repostedRecords, respostedMap
Takes a search request and serializes into a string for use as a key into the
internal cache for the hub.
func (s *Server) serializeSearchRequest(request *pb.SearchRequest) string {
// Save the offest / limit and set to zero, cache hits should happen regardless
// and they're used in post processing
//offset, limit := request.Offset, request.Limit
//request.Offset = 0
//request.Limit = 0
bytes, err := protojson.Marshal(request)
if err != nil {
return ""
str := string((*s.S256).Sum(bytes))
// log.Println(str)
//request.Offset = offset
//request.Limit = limit
return str
func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*record {
finalHits := make([]*record, 0, len(searchHits))
var channelCounters map[string]int
@ -803,3 +889,4 @@ func removeBlocked(searchHits []*record) ([]*record, []*record, map[string]*pb.B
return newHits, blockedHits, blockedChannels
@ -2,14 +2,16 @@ package server
import (
pb "github.com/lbryio/hub/protobuf/go"
@ -18,12 +20,17 @@ import (
type Server struct {
GrpcServer *grpc.Server
Args *Args
MultiSpaceRe *regexp.Regexp
WeirdCharsRe *regexp.Regexp
EsClient *elastic.Client
Servers []*FederatedServer
GrpcServer *grpc.Server
Args *Args
MultiSpaceRe *regexp.Regexp
WeirdCharsRe *regexp.Regexp
EsClient *elastic.Client
Servers []*FederatedServer
QueryCache *ttlcache.Cache
S256 *hash.Hash
LastRefreshCheck time.Time
RefreshDelta time.Duration
NumESRefreshes int64
@ -41,13 +48,15 @@ const (
type Args struct {
// TODO Make command types an enum
CmdType int
Host string
Port string
EsHost string
EsPort string
EsIndex string
Debug bool
CmdType int
Host string
Port string
EsHost string
EsPort string
EsIndex string
Debug bool
RefreshDelta int
CacheTTL int
func getVersion() string {
@ -123,12 +132,29 @@ func MakeHubServer(args *Args) *Server {
if err != nil {
cache := ttlcache.NewCache()
err = cache.SetTTL(time.Duration(args.CacheTTL) * time.Minute)
if err != nil {
s256 := sha256.New()
var refreshDelta = time.Second * time.Duration(args.RefreshDelta)
if args.Debug {
refreshDelta = time.Second * 0
s := &Server{
GrpcServer: grpcServer,
Args: args,
MultiSpaceRe: multiSpaceRe,
WeirdCharsRe: weirdCharsRe,
EsClient: client,
GrpcServer: grpcServer,
Args: args,
MultiSpaceRe: multiSpaceRe,
WeirdCharsRe: weirdCharsRe,
EsClient: client,
QueryCache: cache,
S256: &s256,
LastRefreshCheck: time.Now(),
RefreshDelta: refreshDelta,
NumESRefreshes: 0,
return s
