337 lines
10 KiB
Go
337 lines
10 KiB
Go
package cmd
|
|
|
|
import (
|
|
"os"
|
|
"os/signal"
|
|
"strconv"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
|
"github.com/lbryio/lbry.go/v2/extras/stop"
|
|
"github.com/lbryio/reflector.go/db"
|
|
"github.com/lbryio/reflector.go/internal/metrics"
|
|
"github.com/lbryio/reflector.go/meta"
|
|
"github.com/lbryio/reflector.go/peer"
|
|
"github.com/lbryio/reflector.go/peer/http3"
|
|
"github.com/lbryio/reflector.go/reflector"
|
|
"github.com/lbryio/reflector.go/server/http"
|
|
"github.com/lbryio/reflector.go/store"
|
|
|
|
"github.com/lbryio/lbry.go/v2/stream"
|
|
|
|
"github.com/c2h5oh/datasize"
|
|
log "github.com/sirupsen/logrus"
|
|
"github.com/spf13/cobra"
|
|
)
|
|
|
|
var (
|
|
tcpPeerPort int
|
|
http3PeerPort int
|
|
httpPort int
|
|
receiverPort int
|
|
metricsPort int
|
|
disableUploads bool
|
|
disableBlocklist bool
|
|
proxyAddress string
|
|
proxyPort string
|
|
proxyProtocol string
|
|
useDB bool
|
|
cloudFrontEndpoint string
|
|
WasabiEndpoint string
|
|
reflectorCmdDiskCache string
|
|
bufferReflectorCmdDiskCache string
|
|
reflectorCmdMemCache int
|
|
requestQueueSize int
|
|
)
|
|
|
|
func init() {
|
|
var cmd = &cobra.Command{
|
|
Use: "reflector",
|
|
Short: "Run reflector server",
|
|
Run: reflectorCmd,
|
|
}
|
|
cmd.Flags().StringVar(&proxyAddress, "proxy-address", "", "address of another reflector server where blobs are fetched from")
|
|
cmd.Flags().StringVar(&proxyPort, "proxy-port", "5567", "port of another reflector server where blobs are fetched from")
|
|
cmd.Flags().StringVar(&proxyProtocol, "proxy-protocol", "http3", "protocol used to fetch blobs from another reflector server (tcp/http3)")
|
|
cmd.Flags().StringVar(&cloudFrontEndpoint, "cloudfront-endpoint", "", "CloudFront edge endpoint for standard HTTP retrieval")
|
|
cmd.Flags().StringVar(&WasabiEndpoint, "wasabi-endpoint", "", "Wasabi edge endpoint for standard HTTP retrieval")
|
|
cmd.Flags().IntVar(&tcpPeerPort, "tcp-peer-port", 5567, "The port reflector will distribute content from")
|
|
cmd.Flags().IntVar(&http3PeerPort, "http3-peer-port", 5568, "The port reflector will distribute content from over HTTP3 protocol")
|
|
cmd.Flags().IntVar(&httpPort, "http-port", 5569, "The port reflector will distribute content from over HTTP protocol")
|
|
cmd.Flags().IntVar(&receiverPort, "receiver-port", 5566, "The port reflector will receive content from")
|
|
cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for metrics")
|
|
cmd.Flags().IntVar(&requestQueueSize, "request-queue-size", 200, "How many concurrent requests should be submitted to upstream")
|
|
cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server")
|
|
cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating")
|
|
cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not")
|
|
cmd.Flags().StringVar(&reflectorCmdDiskCache, "disk-cache", "",
|
|
"enable disk cache, setting max size and path where to store blobs. format is 'sizeGB:CACHE_PATH'")
|
|
cmd.Flags().StringVar(&bufferReflectorCmdDiskCache, "buffer-disk-cache", "",
|
|
"enable buffer disk cache, setting max size and path where to store blobs. format is 'sizeGB:CACHE_PATH'")
|
|
cmd.Flags().IntVar(&reflectorCmdMemCache, "mem-cache", 0, "enable in-memory cache with a max size of this many blobs")
|
|
rootCmd.AddCommand(cmd)
|
|
}
|
|
|
|
func reflectorCmd(cmd *cobra.Command, args []string) {
|
|
log.Printf("reflector %s", meta.VersionString())
|
|
cleanerStopper := stop.New()
|
|
|
|
// the blocklist logic requires the db backed store to be the outer-most store
|
|
underlyingStore := setupStore()
|
|
outerStore := wrapWithCache(underlyingStore, cleanerStopper)
|
|
|
|
if !disableUploads {
|
|
reflectorServer := reflector.NewServer(underlyingStore, outerStore)
|
|
reflectorServer.Timeout = 3 * time.Minute
|
|
reflectorServer.EnableBlocklist = !disableBlocklist
|
|
|
|
err := reflectorServer.Start(":" + strconv.Itoa(receiverPort))
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
defer reflectorServer.Shutdown()
|
|
}
|
|
|
|
peerServer := peer.NewServer(outerStore)
|
|
err := peerServer.Start(":" + strconv.Itoa(tcpPeerPort))
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
defer peerServer.Shutdown()
|
|
|
|
http3PeerServer := http3.NewServer(outerStore, requestQueueSize)
|
|
err = http3PeerServer.Start(":" + strconv.Itoa(http3PeerPort))
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
defer http3PeerServer.Shutdown()
|
|
|
|
httpServer := http.NewServer(outerStore)
|
|
err = httpServer.Start(":" + strconv.Itoa(httpPort))
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
defer httpServer.Shutdown()
|
|
|
|
metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics")
|
|
metricsServer.Start()
|
|
defer metricsServer.Shutdown()
|
|
defer outerStore.Shutdown()
|
|
defer underlyingStore.Shutdown()
|
|
|
|
interruptChan := make(chan os.Signal, 1)
|
|
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
|
|
<-interruptChan
|
|
// deferred shutdowns happen now
|
|
cleanerStopper.StopAndWait()
|
|
}
|
|
|
|
func setupStore() store.BlobStore {
|
|
var s store.BlobStore
|
|
|
|
if proxyAddress != "" {
|
|
switch proxyProtocol {
|
|
case "tcp":
|
|
s = peer.NewStore(peer.StoreOpts{
|
|
Address: proxyAddress + ":" + proxyPort,
|
|
Timeout: 30 * time.Second,
|
|
})
|
|
case "http3":
|
|
s = http3.NewStore(http3.StoreOpts{
|
|
Address: proxyAddress + ":" + proxyPort,
|
|
Timeout: 30 * time.Second,
|
|
})
|
|
case "http":
|
|
s = store.NewHttpStore(proxyAddress + ":" + proxyPort)
|
|
default:
|
|
log.Fatalf("protocol is not recognized: %s", proxyProtocol)
|
|
}
|
|
} else {
|
|
var s3Store *store.S3Store
|
|
if conf != "none" {
|
|
s3Store = store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
|
|
}
|
|
if cloudFrontEndpoint != "" && WasabiEndpoint != "" {
|
|
ittt := store.NewITTTStore(store.NewCloudFrontROStore(WasabiEndpoint), store.NewCloudFrontROStore(cloudFrontEndpoint))
|
|
if s3Store != nil {
|
|
s = store.NewCloudFrontRWStore(ittt, s3Store)
|
|
} else {
|
|
s = ittt
|
|
}
|
|
} else if s3Store != nil {
|
|
s = s3Store
|
|
} else {
|
|
log.Fatalf("this configuration does not include a valid upstream source")
|
|
}
|
|
}
|
|
|
|
if useDB {
|
|
dbInst := &db.SQL{
|
|
TrackAccess: db.TrackAccessStreams,
|
|
LogQueries: log.GetLevel() == log.DebugLevel,
|
|
}
|
|
err := dbInst.Connect(globalConfig.DBConn)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
s = store.NewDBBackedStore(s, dbInst, false)
|
|
}
|
|
|
|
return s
|
|
}
|
|
|
|
func wrapWithCache(s store.BlobStore, cleanerStopper *stop.Group) store.BlobStore {
|
|
wrapped := s
|
|
|
|
diskCacheMaxSize, diskCachePath := diskCacheParams(reflectorCmdDiskCache)
|
|
//we are tracking blobs in memory with a 1 byte long boolean, which means that for each 2MB (a blob) we need 1Byte
|
|
// so if the underlying cache holds 10MB, 10MB/2MB=5Bytes which is also the exact count of objects to restore on startup
|
|
realCacheSize := float64(diskCacheMaxSize) / float64(stream.MaxBlobSize)
|
|
if diskCacheMaxSize > 0 {
|
|
err := os.MkdirAll(diskCachePath, os.ModePerm)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
localDb := &db.SQL{
|
|
SoftDelete: true,
|
|
TrackAccess: db.TrackAccessBlobs,
|
|
LogQueries: log.GetLevel() == log.DebugLevel,
|
|
}
|
|
err = localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector")
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
dbBackedDiskStore := store.NewDBBackedStore(store.NewDiskStore(diskCachePath, 2), localDb, true)
|
|
wrapped = store.NewCachingStore(
|
|
"reflector",
|
|
wrapped,
|
|
dbBackedDiskStore,
|
|
)
|
|
|
|
go cleanOldestBlobs(int(realCacheSize), localDb, dbBackedDiskStore, cleanerStopper)
|
|
}
|
|
|
|
diskCacheMaxSize, diskCachePath = diskCacheParams(bufferReflectorCmdDiskCache)
|
|
realCacheSize = float64(diskCacheMaxSize) / float64(stream.MaxBlobSize)
|
|
if diskCacheMaxSize > 0 {
|
|
err := os.MkdirAll(diskCachePath, os.ModePerm)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
wrapped = store.NewCachingStore(
|
|
"reflector",
|
|
wrapped,
|
|
store.NewLFUDAStore("nvme", store.NewDiskStore(diskCachePath, 2), realCacheSize),
|
|
)
|
|
}
|
|
|
|
if reflectorCmdMemCache > 0 {
|
|
wrapped = store.NewCachingStore(
|
|
"reflector",
|
|
wrapped,
|
|
store.NewLRUStore("mem", store.NewMemStore(), reflectorCmdMemCache),
|
|
)
|
|
}
|
|
|
|
return wrapped
|
|
}
|
|
|
|
func diskCacheParams(diskParams string) (int, string) {
|
|
if diskParams == "" {
|
|
return 0, ""
|
|
}
|
|
|
|
parts := strings.Split(diskParams, ":")
|
|
if len(parts) != 2 {
|
|
log.Fatalf("--disk-cache must be a number, followed by ':', followed by a string")
|
|
}
|
|
|
|
diskCacheSize := parts[0]
|
|
path := parts[1]
|
|
if len(path) == 0 || path[0] != '/' {
|
|
log.Fatalf("--disk-cache path must start with '/'")
|
|
}
|
|
|
|
var maxSize datasize.ByteSize
|
|
err := maxSize.UnmarshalText([]byte(diskCacheSize))
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
if maxSize <= 0 {
|
|
log.Fatal("--disk-cache size must be more than 0")
|
|
}
|
|
return int(maxSize), path
|
|
}
|
|
|
|
func cleanOldestBlobs(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) {
|
|
// this is so that it runs on startup without having to wait for 10 minutes
|
|
err := doClean(maxItems, db, store, stopper)
|
|
if err != nil {
|
|
log.Error(errors.FullTrace(err))
|
|
}
|
|
const cleanupInterval = 10 * time.Minute
|
|
for {
|
|
select {
|
|
case <-stopper.Ch():
|
|
log.Infoln("stopping self cleanup")
|
|
return
|
|
case <-time.After(cleanupInterval):
|
|
err := doClean(maxItems, db, store, stopper)
|
|
if err != nil {
|
|
log.Error(errors.FullTrace(err))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func doClean(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) error {
|
|
blobsCount, err := db.Count()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if blobsCount >= maxItems {
|
|
itemsToDelete := blobsCount / 10
|
|
blobs, err := db.LeastRecentlyAccessedHashes(itemsToDelete)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
blobsChan := make(chan string, len(blobs))
|
|
wg := &stop.Group{}
|
|
go func() {
|
|
for _, hash := range blobs {
|
|
select {
|
|
case <-stopper.Ch():
|
|
return
|
|
default:
|
|
}
|
|
blobsChan <- hash
|
|
}
|
|
close(blobsChan)
|
|
}()
|
|
for i := 0; i < 3; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for h := range blobsChan {
|
|
select {
|
|
case <-stopper.Ch():
|
|
return
|
|
default:
|
|
}
|
|
err = store.Delete(h)
|
|
if err != nil {
|
|
log.Errorf("error pruning %s: %s", h, errors.FullTrace(err))
|
|
continue
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
}
|
|
return nil
|
|
}
|