reflector.go/reflector/blocklist.go

160 lines
3.5 KiB
Go
Raw Normal View History

2018-09-01 01:50:22 +02:00
package reflector
import (
"encoding/hex"
"encoding/json"
"net/http"
"strconv"
"strings"
2018-09-20 17:24:36 +02:00
"time"
2018-09-01 01:50:22 +02:00
"github.com/lbryio/reflector.go/internal/metrics"
2018-09-20 17:24:36 +02:00
"github.com/lbryio/reflector.go/store"
2018-09-01 01:50:22 +02:00
"github.com/lbryio/reflector.go/wallet"
2019-11-14 01:11:35 +01:00
"github.com/lbryio/lbry.go/v2/extras/errors"
2020-07-01 20:42:23 +02:00
"github.com/lbryio/lbry.go/v2/extras/stop"
2018-09-01 01:50:22 +02:00
2018-09-20 17:24:36 +02:00
log "github.com/sirupsen/logrus"
2018-09-01 01:50:22 +02:00
)
2019-07-31 00:02:32 +02:00
const blocklistURL = "https://api.lbry.com/file/list_blocked"
2018-09-01 01:50:22 +02:00
2018-09-20 17:24:36 +02:00
func (s *Server) enableBlocklist(b store.Blocklister) {
2020-07-01 20:42:23 +02:00
walletServers := []string{
"spv25.lbry.com:50001",
"spv26.lbry.com:50001",
"spv19.lbry.com:50001",
"spv14.lbry.com:50001",
}
updateBlocklist(b, walletServers, s.grp.Ch())
2018-09-20 17:24:36 +02:00
t := time.NewTicker(12 * time.Hour)
for {
select {
case <-s.grp.Ch():
return
case <-t.C:
2020-07-01 20:42:23 +02:00
updateBlocklist(b, walletServers, s.grp.Ch())
2018-09-20 17:24:36 +02:00
}
}
2018-09-01 01:50:22 +02:00
}
2020-07-01 20:42:23 +02:00
func updateBlocklist(b store.Blocklister, walletServers []string, stopper stop.Chan) {
log.Debugf("blocklist update starting")
values, err := blockedSdHashes(walletServers, stopper)
2018-09-20 17:24:36 +02:00
if err != nil {
log.Error(err)
return
}
for name, v := range values {
2018-09-20 17:24:36 +02:00
if v.Err != nil {
log.Error(errors.FullTrace(errors.Err("blocklist: %s: %s", name, v.Err)))
2018-09-20 17:24:36 +02:00
continue
}
err = b.Block(v.Value)
if err != nil {
log.Error(err)
}
}
2020-07-01 20:42:23 +02:00
log.Debugf("blocklist update done")
2018-09-20 17:24:36 +02:00
}
2020-07-01 20:42:23 +02:00
func blockedSdHashes(walletServers []string, stopper stop.Chan) (map[string]valOrErr, error) {
client := http.Client{Timeout: 1 * time.Second}
resp, err := client.Get(blocklistURL)
2018-09-01 01:50:22 +02:00
if err != nil {
return nil, errors.Err(err)
}
2018-09-24 16:31:14 +02:00
defer func() {
err := resp.Body.Close()
if err != nil {
2020-02-26 16:02:53 +01:00
log.Errorln(errors.Err(err))
2018-09-24 16:31:14 +02:00
}
}()
2018-09-01 01:50:22 +02:00
2018-09-20 17:24:36 +02:00
var r struct {
Success bool `json:"success"`
Error string `json:"error"`
Data struct {
Outpoints []string `json:"outpoints"`
} `json:"data"`
}
2018-09-01 01:50:22 +02:00
if err = json.NewDecoder(resp.Body).Decode(&r); err != nil {
return nil, errors.Err(err)
}
if !r.Success {
return nil, errors.Prefix("list_blocked API call", r.Error)
}
2020-07-01 20:42:23 +02:00
return sdHashesForOutpoints(walletServers, r.Data.Outpoints, stopper)
2018-09-11 13:41:29 +02:00
}
2018-09-01 01:50:22 +02:00
2018-09-20 20:24:30 +02:00
type valOrErr struct {
2018-09-11 13:41:29 +02:00
Value string
Err error
2018-09-01 01:50:22 +02:00
}
2018-09-11 13:41:29 +02:00
// sdHashesForOutpoints queries wallet server for the sd hashes in a given outpoints
2020-07-01 20:42:23 +02:00
func sdHashesForOutpoints(walletServers, outpoints []string, stopper stop.Chan) (map[string]valOrErr, error) {
2018-09-20 20:24:30 +02:00
values := make(map[string]valOrErr)
2018-09-01 01:50:22 +02:00
node := wallet.NewNode()
2020-07-01 20:42:23 +02:00
err := node.Connect(walletServers, nil)
2018-09-01 01:50:22 +02:00
if err != nil {
2020-02-26 16:02:53 +01:00
return nil, errors.Err(err)
2018-09-01 01:50:22 +02:00
}
2020-07-01 20:42:23 +02:00
done := make(chan bool)
metrics.RoutinesQueue.WithLabelValues("reflector", "sdhashesforoutput").Inc()
2020-07-01 20:42:23 +02:00
go func() {
defer metrics.RoutinesQueue.WithLabelValues("reflector", "sdhashesforoutput").Dec()
2020-07-01 20:42:23 +02:00
select {
case <-done:
case <-stopper:
}
node.Shutdown()
}()
OutpointLoop:
2018-09-11 13:41:29 +02:00
for _, outpoint := range outpoints {
2020-07-01 20:42:23 +02:00
select {
case <-stopper:
break OutpointLoop
default:
}
2018-09-11 13:41:29 +02:00
parts := strings.Split(outpoint, ":")
if len(parts) != 2 {
2018-09-20 20:24:30 +02:00
values[outpoint] = valOrErr{Err: errors.Err("invalid outpoint format")}
2018-09-11 13:41:29 +02:00
continue
}
nout, err := strconv.Atoi(parts[1])
if err != nil {
2018-09-20 20:24:30 +02:00
values[outpoint] = valOrErr{Err: errors.Prefix("invalid nout", err)}
2018-09-11 13:41:29 +02:00
continue
}
claim, err := node.GetClaimInTx(parts[0], nout)
2018-09-11 13:41:29 +02:00
if err != nil {
2018-09-20 20:24:30 +02:00
values[outpoint] = valOrErr{Err: err}
2018-09-11 13:41:29 +02:00
continue
}
hash := hex.EncodeToString(claim.GetStream().GetSource().GetSdHash())
values[outpoint] = valOrErr{Value: hash, Err: nil}
2018-09-01 01:50:22 +02:00
}
2020-07-01 20:42:23 +02:00
select {
case done <- true:
default: // in case of race where stopper got stopped right after loop finished
}
2018-09-11 13:41:29 +02:00
return values, nil
2018-09-01 01:50:22 +02:00
}