Insert in tx #51

Closed
lyoshenka wants to merge 52 commits from insert_in_tx into master
6 changed files with 104 additions and 12 deletions
Showing only changes of commit ec3aae33ba - Show all commits

View file

@ -37,6 +37,7 @@ var (
proxyProtocol string proxyProtocol string
useDB bool useDB bool
cloudFrontEndpoint string cloudFrontEndpoint string
WasabiEndpoint string
reflectorCmdDiskCache string reflectorCmdDiskCache string
bufferReflectorCmdDiskCache string bufferReflectorCmdDiskCache string
reflectorCmdMemCache int reflectorCmdMemCache int
@ -52,6 +53,7 @@ func init() {
cmd.Flags().StringVar(&proxyPort, "proxy-port", "5567", "port of another reflector server where blobs are fetched from") cmd.Flags().StringVar(&proxyPort, "proxy-port", "5567", "port of another reflector server where blobs are fetched from")
cmd.Flags().StringVar(&proxyProtocol, "proxy-protocol", "http3", "protocol used to fetch blobs from another reflector server (tcp/http3)") cmd.Flags().StringVar(&proxyProtocol, "proxy-protocol", "http3", "protocol used to fetch blobs from another reflector server (tcp/http3)")
cmd.Flags().StringVar(&cloudFrontEndpoint, "cloudfront-endpoint", "", "CloudFront edge endpoint for standard HTTP retrieval") cmd.Flags().StringVar(&cloudFrontEndpoint, "cloudfront-endpoint", "", "CloudFront edge endpoint for standard HTTP retrieval")
cmd.Flags().StringVar(&WasabiEndpoint, "wasabi-endpoint", "", "Wasabi edge endpoint for standard HTTP retrieval")
cmd.Flags().IntVar(&tcpPeerPort, "tcp-peer-port", 5567, "The port reflector will distribute content from") cmd.Flags().IntVar(&tcpPeerPort, "tcp-peer-port", 5567, "The port reflector will distribute content from")
cmd.Flags().IntVar(&http3PeerPort, "http3-peer-port", 5568, "The port reflector will distribute content from over HTTP3 protocol") cmd.Flags().IntVar(&http3PeerPort, "http3-peer-port", 5568, "The port reflector will distribute content from over HTTP3 protocol")
cmd.Flags().IntVar(&receiverPort, "receiver-port", 5566, "The port reflector will receive content from") cmd.Flags().IntVar(&receiverPort, "receiver-port", 5566, "The port reflector will receive content from")
@ -137,12 +139,12 @@ func setupStore() store.BlobStore {
if conf != "none" { if conf != "none" {
s3Store = store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) s3Store = store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
} }
if cloudFrontEndpoint != "" { if cloudFrontEndpoint != "" && WasabiEndpoint != "" {
cfs := store.NewCloudFrontROStore(cloudFrontEndpoint) ittt := store.NewITTTStore(store.NewCloudFrontROStore(WasabiEndpoint), store.NewCloudFrontROStore(cloudFrontEndpoint))
if s3Store != nil { if s3Store != nil {
s = store.NewCloudFrontRWStore(cfs, s3Store) s = store.NewCloudFrontRWStore(ittt, s3Store)
} else { } else {
s = cfs s = ittt
} }
} else if s3Store != nil { } else if s3Store != nil {
s = s3Store s = s3Store

View file

@ -60,6 +60,7 @@ func (s *Server) Shutdown() {
const ( const (
ns = "reflector" ns = "reflector"
subsystemCache = "cache" subsystemCache = "cache"
subsystemITTT = "ittt"
labelDirection = "direction" labelDirection = "direction"
labelErrorType = "error_type" labelErrorType = "error_type"
@ -124,6 +125,18 @@ var (
Name: "hit_total", Name: "hit_total",
Help: "Total number of blobs retrieved from the cache storage", Help: "Total number of blobs retrieved from the cache storage",
}, []string{LabelCacheType, LabelComponent}) }, []string{LabelCacheType, LabelComponent})
ThisHitCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Subsystem: subsystemITTT,
Name: "this_hit_total",
Help: "Total number of blobs retrieved from the this storage",
})
ThatHitCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Subsystem: subsystemITTT,
Name: "that_hit_total",
Help: "Total number of blobs retrieved from the that storage",
})
CacheMissCount = promauto.NewCounterVec(prometheus.CounterOpts{ CacheMissCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Subsystem: subsystemCache, Subsystem: subsystemCache,

View file

@ -112,7 +112,7 @@ func (s *Server) Start(address string) error {
}, },
QuicConfig: quicConf, QuicConfig: quicConf,
} }
go InitWorkers(s, 100) go InitWorkers(s, 200)
go s.listenForShutdown(&server) go s.listenForShutdown(&server)
s.grp.Add(1) s.grp.Add(1)
go func() { go func() {

View file

@ -7,15 +7,15 @@ import (
"github.com/lbryio/reflector.go/shared" "github.com/lbryio/reflector.go/shared"
) )
// CloudFrontRWStore combines a Cloudfront and an S3 store. Reads go to Cloudfront, writes go to S3. // CloudFrontRWStore combines a Cloudfront and an S3 store. Reads go to Cloudfront/Wasabi, writes go to S3.
type CloudFrontRWStore struct { type CloudFrontRWStore struct {
cf *CloudFrontROStore cf *ITTTStore
s3 *S3Store s3 *S3Store
} }
// NewCloudFrontRWStore returns an initialized CloudFrontRWStore store pointer. // NewCloudFrontRWStore returns an initialized CloudFrontRWStore store pointer.
// NOTE: It panics if either argument is nil. // NOTE: It panics if either argument is nil.
func NewCloudFrontRWStore(cf *CloudFrontROStore, s3 *S3Store) *CloudFrontRWStore { func NewCloudFrontRWStore(cf *ITTTStore, s3 *S3Store) *CloudFrontRWStore {
if cf == nil || s3 == nil { if cf == nil || s3 == nil {
panic("both stores must be set") panic("both stores must be set")
} }

75
store/ittt.go Normal file
View file

@ -0,0 +1,75 @@
package store
import (
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/shared"
)
// ITTT store performs an operation on this storage, if this fails, it attempts to run it on that
type ITTTStore struct {
this, that BlobStore
}
// NewCachingStore makes a new caching disk store and returns a pointer to it.
func NewITTTStore(this, that BlobStore) *ITTTStore {
return &ITTTStore{
this: this,
that: that,
}
}
const nameIttt = "ittt"
// Name is the cache type name
func (c *ITTTStore) Name() string { return nameIttt }
// Has checks the cache and then the origin for a hash. It returns true if either store has it.
func (c *ITTTStore) Has(hash string) (bool, error) {
has, err := c.this.Has(hash)
if err != nil || !has {
has, err = c.that.Has(hash)
}
return has, err
}
// Get tries to get the blob from this first, falling back to that.
func (c *ITTTStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
blob, trace, err := c.this.Get(hash)
if err == nil {
metrics.ThisHitCount.Inc()
return blob, trace.Stack(time.Since(start), c.Name()), err
}
blob, trace, err = c.that.Get(hash)
if err != nil {
return nil, trace.Stack(time.Since(start), c.Name()), err
}
metrics.ThatHitCount.Inc()
return blob, trace.Stack(time.Since(start), c.Name()), nil
}
// Put not implemented
func (c *ITTTStore) Put(hash string, blob stream.Blob) error {
return errors.Err(shared.ErrNotImplemented)
}
// PutSD not implemented
func (c *ITTTStore) PutSD(hash string, blob stream.Blob) error {
return errors.Err(shared.ErrNotImplemented)
}
// Delete not implemented
func (c *ITTTStore) Delete(hash string) error {
return errors.Err(shared.ErrNotImplemented)
}
// Shutdown shuts down the store gracefully
func (c *ITTTStore) Shutdown() {
return
}

View file

@ -112,10 +112,11 @@ func (s *S3Store) Put(hash string, blob stream.Blob) error {
}(time.Now()) }(time.Now())
_, err = s3manager.NewUploader(s.session).Upload(&s3manager.UploadInput{ _, err = s3manager.NewUploader(s.session).Upload(&s3manager.UploadInput{
Bucket: aws.String(s.bucket), Bucket: aws.String(s.bucket),
Key: aws.String(hash), Key: aws.String(hash),
Body: bytes.NewBuffer(blob), Body: bytes.NewBuffer(blob),
StorageClass: aws.String(s3.StorageClassIntelligentTiering), ACL: aws.String("public-read"),
//StorageClass: aws.String(s3.StorageClassIntelligentTiering),
}) })
metrics.MtrOutBytesReflector.Add(float64(blob.Size())) metrics.MtrOutBytesReflector.Add(float64(blob.Size()))
@ -152,6 +153,7 @@ func (s *S3Store) initOnce() error {
sess, err := session.NewSession(&aws.Config{ sess, err := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(s.awsID, s.awsSecret, ""), Credentials: credentials.NewStaticCredentials(s.awsID, s.awsSecret, ""),
Region: aws.String(s.region), Region: aws.String(s.region),
Endpoint: aws.String("https://s3.wasabisys.com"),
}) })
if err != nil { if err != nil {
return err return err