reflector.go/cmd/reflector.go
2021-07-24 01:03:51 +02:00

325 lines
9.7 KiB
Go

package cmd
import (
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/reflector.go/db"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/meta"
"github.com/lbryio/reflector.go/peer"
"github.com/lbryio/reflector.go/peer/http3"
"github.com/lbryio/reflector.go/reflector"
"github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/c2h5oh/datasize"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
var (
tcpPeerPort int
http3PeerPort int
receiverPort int
metricsPort int
disableUploads bool
disableBlocklist bool
proxyAddress string
proxyPort string
proxyProtocol string
useDB bool
cloudFrontEndpoint string
WasabiEndpoint string
reflectorCmdDiskCache string
bufferReflectorCmdDiskCache string
reflectorCmdMemCache int
requestQueueSize int
)
func init() {
var cmd = &cobra.Command{
Use: "reflector",
Short: "Run reflector server",
Run: reflectorCmd,
}
cmd.Flags().StringVar(&proxyAddress, "proxy-address", "", "address of another reflector server where blobs are fetched from")
cmd.Flags().StringVar(&proxyPort, "proxy-port", "5567", "port of another reflector server where blobs are fetched from")
cmd.Flags().StringVar(&proxyProtocol, "proxy-protocol", "http3", "protocol used to fetch blobs from another reflector server (tcp/http3)")
cmd.Flags().StringVar(&cloudFrontEndpoint, "cloudfront-endpoint", "", "CloudFront edge endpoint for standard HTTP retrieval")
cmd.Flags().StringVar(&WasabiEndpoint, "wasabi-endpoint", "", "Wasabi edge endpoint for standard HTTP retrieval")
cmd.Flags().IntVar(&tcpPeerPort, "tcp-peer-port", 5567, "The port reflector will distribute content from")
cmd.Flags().IntVar(&http3PeerPort, "http3-peer-port", 5568, "The port reflector will distribute content from over HTTP3 protocol")
cmd.Flags().IntVar(&receiverPort, "receiver-port", 5566, "The port reflector will receive content from")
cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for metrics")
cmd.Flags().IntVar(&requestQueueSize, "request-queue-size", 200, "How many concurrent requests should be submitted to upstream")
cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server")
cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating")
cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not")
cmd.Flags().StringVar(&reflectorCmdDiskCache, "disk-cache", "",
"enable disk cache, setting max size and path where to store blobs. format is 'sizeGB:CACHE_PATH'")
cmd.Flags().StringVar(&bufferReflectorCmdDiskCache, "buffer-disk-cache", "",
"enable buffer disk cache, setting max size and path where to store blobs. format is 'sizeGB:CACHE_PATH'")
cmd.Flags().IntVar(&reflectorCmdMemCache, "mem-cache", 0, "enable in-memory cache with a max size of this many blobs")
rootCmd.AddCommand(cmd)
}
func reflectorCmd(cmd *cobra.Command, args []string) {
log.Printf("reflector %s", meta.VersionString())
cleanerStopper := stop.New()
// the blocklist logic requires the db backed store to be the outer-most store
underlyingStore := setupStore()
outerStore := wrapWithCache(underlyingStore, cleanerStopper)
if !disableUploads {
reflectorServer := reflector.NewServer(underlyingStore, outerStore)
reflectorServer.Timeout = 3 * time.Minute
reflectorServer.EnableBlocklist = !disableBlocklist
err := reflectorServer.Start(":" + strconv.Itoa(receiverPort))
if err != nil {
log.Fatal(err)
}
defer reflectorServer.Shutdown()
}
peerServer := peer.NewServer(outerStore)
err := peerServer.Start(":" + strconv.Itoa(tcpPeerPort))
if err != nil {
log.Fatal(err)
}
defer peerServer.Shutdown()
http3PeerServer := http3.NewServer(outerStore, requestQueueSize)
err = http3PeerServer.Start(":" + strconv.Itoa(http3PeerPort))
if err != nil {
log.Fatal(err)
}
defer http3PeerServer.Shutdown()
metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics")
metricsServer.Start()
defer metricsServer.Shutdown()
defer outerStore.Shutdown()
defer underlyingStore.Shutdown()
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
<-interruptChan
// deferred shutdowns happen now
cleanerStopper.StopAndWait()
}
func setupStore() store.BlobStore {
var s store.BlobStore
if proxyAddress != "" {
switch proxyProtocol {
case "tcp":
s = peer.NewStore(peer.StoreOpts{
Address: proxyAddress + ":" + proxyPort,
Timeout: 30 * time.Second,
})
case "http3":
s = http3.NewStore(http3.StoreOpts{
Address: proxyAddress + ":" + proxyPort,
Timeout: 30 * time.Second,
})
default:
log.Fatalf("protocol is not recognized: %s", proxyProtocol)
}
} else {
var s3Store *store.S3Store
if conf != "none" {
s3Store = store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
}
if cloudFrontEndpoint != "" && WasabiEndpoint != "" {
ittt := store.NewITTTStore(store.NewCloudFrontROStore(WasabiEndpoint), store.NewCloudFrontROStore(cloudFrontEndpoint))
if s3Store != nil {
s = store.NewCloudFrontRWStore(ittt, s3Store)
} else {
s = ittt
}
} else if s3Store != nil {
s = s3Store
} else {
log.Fatalf("this configuration does not include a valid upstream source")
}
}
if useDB {
dbInst := &db.SQL{
TrackAccess: db.TrackAccessStreams,
LogQueries: log.GetLevel() == log.DebugLevel,
}
err := dbInst.Connect(globalConfig.DBConn)
if err != nil {
log.Fatal(err)
}
s = store.NewDBBackedStore(s, dbInst, false)
}
return s
}
func wrapWithCache(s store.BlobStore, cleanerStopper *stop.Group) store.BlobStore {
wrapped := s
diskCacheMaxSize, diskCachePath := diskCacheParams(reflectorCmdDiskCache)
//we are tracking blobs in memory with a 1 byte long boolean, which means that for each 2MB (a blob) we need 1Byte
// so if the underlying cache holds 10MB, 10MB/2MB=5Bytes which is also the exact count of objects to restore on startup
realCacheSize := float64(diskCacheMaxSize) / float64(stream.MaxBlobSize)
if diskCacheMaxSize > 0 {
err := os.MkdirAll(diskCachePath, os.ModePerm)
if err != nil {
log.Fatal(err)
}
localDb := &db.SQL{
SoftDelete: true,
TrackAccess: db.TrackAccessBlobs,
LogQueries: log.GetLevel() == log.DebugLevel,
}
err = localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector")
if err != nil {
log.Fatal(err)
}
dbBackedDiskStore := store.NewDBBackedStore(store.NewDiskStore(diskCachePath, 2), localDb, true)
wrapped = store.NewCachingStore(
"reflector",
wrapped,
dbBackedDiskStore,
)
go cleanOldestBlobs(int(realCacheSize), localDb, dbBackedDiskStore, cleanerStopper)
}
diskCacheMaxSize, diskCachePath = diskCacheParams(bufferReflectorCmdDiskCache)
realCacheSize = float64(diskCacheMaxSize) / float64(stream.MaxBlobSize)
if diskCacheMaxSize > 0 {
err := os.MkdirAll(diskCachePath, os.ModePerm)
if err != nil {
log.Fatal(err)
}
wrapped = store.NewCachingStore(
"reflector",
wrapped,
store.NewLFUDAStore("nvme", store.NewDiskStore(diskCachePath, 2), realCacheSize),
)
}
if reflectorCmdMemCache > 0 {
wrapped = store.NewCachingStore(
"reflector",
wrapped,
store.NewLRUStore("mem", store.NewMemStore(), reflectorCmdMemCache),
)
}
return wrapped
}
func diskCacheParams(diskParams string) (int, string) {
if diskParams == "" {
return 0, ""
}
parts := strings.Split(diskParams, ":")
if len(parts) != 2 {
log.Fatalf("--disk-cache must be a number, followed by ':', followed by a string")
}
diskCacheSize := parts[0]
path := parts[1]
if len(path) == 0 || path[0] != '/' {
log.Fatalf("--disk-cache path must start with '/'")
}
var maxSize datasize.ByteSize
err := maxSize.UnmarshalText([]byte(diskCacheSize))
if err != nil {
log.Fatal(err)
}
if maxSize <= 0 {
log.Fatal("--disk-cache size must be more than 0")
}
return int(maxSize), path
}
func cleanOldestBlobs(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) {
// this is so that it runs on startup without having to wait for 10 minutes
err := doClean(maxItems, db, store, stopper)
if err != nil {
log.Error(errors.FullTrace(err))
}
const cleanupInterval = 10 * time.Minute
for {
select {
case <-stopper.Ch():
log.Infoln("stopping self cleanup")
return
case <-time.After(cleanupInterval):
err := doClean(maxItems, db, store, stopper)
if err != nil {
log.Error(errors.FullTrace(err))
}
}
}
}
func doClean(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) error {
blobsCount, err := db.Count()
if err != nil {
return err
}
if blobsCount >= maxItems {
itemsToDelete := blobsCount / 10
blobs, err := db.LeastRecentlyAccessedHashes(itemsToDelete)
if err != nil {
return err
}
blobsChan := make(chan string, len(blobs))
wg := &stop.Group{}
go func() {
for _, hash := range blobs {
select {
case <-stopper.Ch():
return
default:
}
blobsChan <- hash
}
close(blobsChan)
}()
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for h := range blobsChan {
select {
case <-stopper.Ch():
return
default:
}
err = store.Delete(h)
if err != nil {
log.Errorf("error pruning %s: %s", h, errors.FullTrace(err))
continue
}
}
}()
}
wg.Wait()
}
return nil
}