when reflecting a sdblob, insert all the stream and intermediate blobs using a transaction #50

Closed
shyba wants to merge 39 commits from insert_under_tx into master
3 changed files with 20 additions and 16 deletions
Showing only changes of commit bb41a84bb7 - Show all commits

View file

@ -159,9 +159,10 @@ func wrapWithCache(s store.BlobStore) store.BlobStore {
wrapped = store.NewCachingStore( wrapped = store.NewCachingStore(
"reflector", "reflector",
wrapped, wrapped,
store.NewLRUStore("peer_server", store.NewDiskStore(diskCachePath, 2), diskCacheMaxSize), store.NewLRUStore("hdd", store.NewDiskStore(diskCachePath, 2), diskCacheMaxSize),
) )
} }
diskCacheMaxSize, diskCachePath = diskCacheParams(bufferReflectorCmdDiskCache) diskCacheMaxSize, diskCachePath = diskCacheParams(bufferReflectorCmdDiskCache)
if diskCacheMaxSize > 0 { if diskCacheMaxSize > 0 {
err := os.MkdirAll(diskCachePath, os.ModePerm) err := os.MkdirAll(diskCachePath, os.ModePerm)
@ -171,14 +172,15 @@ func wrapWithCache(s store.BlobStore) store.BlobStore {
wrapped = store.NewCachingStore( wrapped = store.NewCachingStore(
"reflector", "reflector",
wrapped, wrapped,
store.NewLRUStore("peer_server", store.NewDiskStore(diskCachePath, 2), diskCacheMaxSize), store.NewLRUStore("nvme", store.NewDiskStore(diskCachePath, 2), diskCacheMaxSize),
) )
} }
if reflectorCmdMemCache > 0 { if reflectorCmdMemCache > 0 {
wrapped = store.NewCachingStore( wrapped = store.NewCachingStore(
"reflector", "reflector",
wrapped, wrapped,
store.NewLRUStore("peer_server", store.NewMemStore(), reflectorCmdMemCache), store.NewLRUStore("mem", store.NewMemStore(), reflectorCmdMemCache),
) )
} }

View file

@ -18,18 +18,20 @@ type LRUStore struct {
// NewLRUStore initialize a new LRUStore // NewLRUStore initialize a new LRUStore
func NewLRUStore(component string, store BlobStore, maxItems int) *LRUStore { func NewLRUStore(component string, store BlobStore, maxItems int) *LRUStore {
l := &LRUStore{
store: store,
}
lru, err := golru.NewWithEvict(maxItems, func(key interface{}, value interface{}) { lru, err := golru.NewWithEvict(maxItems, func(key interface{}, value interface{}) {
metrics.CacheLRUEvictCount.With(metrics.CacheLabels(store.Name(), component)).Inc() metrics.CacheLRUEvictCount.With(metrics.CacheLabels(l.Name(), component)).Inc()
_ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there _ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there
}) })
if err != nil { if err != nil {
panic(err) panic(err)
} }
l := &LRUStore{ l.lru = lru
store: store,
lru: lru,
}
go func() { go func() {
if lstr, ok := store.(lister); ok { if lstr, ok := store.(lister); ok {
err = l.loadExisting(lstr, maxItems) err = l.loadExisting(lstr, maxItems)
@ -42,10 +44,10 @@ func NewLRUStore(component string, store BlobStore, maxItems int) *LRUStore {
return l return l
} }
const nameLRU = "lru"
// Name is the cache type name // Name is the cache type name
func (l *LRUStore) Name() string { return nameLRU } func (l *LRUStore) Name() string {
return "lru_" + l.store.Name()
}
// Has returns whether the blob is in the store, without updating the recent-ness. // Has returns whether the blob is in the store, without updating the recent-ness.
func (l *LRUStore) Has(hash string) (bool, error) { func (l *LRUStore) Has(hash string) (bool, error) {

View file

@ -32,8 +32,8 @@ func (s *singleflightStore) Name() string {
// Get ensures that only one request per hash is sent to the origin at a time, // Get ensures that only one request per hash is sent to the origin at a time,
// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem // thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem
func (s *singleflightStore) Get(hash string) (stream.Blob, error) { func (s *singleflightStore) Get(hash string) (stream.Blob, error) {
metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Inc() metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Inc()
defer metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Dec() defer metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Dec()
blob, err, _ := s.sf.Do(hash, s.getter(hash)) blob, err, _ := s.sf.Do(hash, s.getter(hash))
if err != nil { if err != nil {
@ -46,8 +46,8 @@ func (s *singleflightStore) Get(hash string) (stream.Blob, error) {
// only one getter per hash will be executing at a time // only one getter per hash will be executing at a time
func (s *singleflightStore) getter(hash string) func() (interface{}, error) { func (s *singleflightStore) getter(hash string) func() (interface{}, error) {
return func() (interface{}, error) { return func() (interface{}, error) {
metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Inc() metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Inc()
defer metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Dec() defer metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Dec()
start := time.Now() start := time.Now()
blob, err := s.BlobStore.Get(hash) blob, err := s.BlobStore.Get(hash)
@ -57,7 +57,7 @@ func (s *singleflightStore) getter(hash string) func() (interface{}, error) {
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds() rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
metrics.CacheRetrievalSpeed.With(map[string]string{ metrics.CacheRetrievalSpeed.With(map[string]string{
metrics.LabelCacheType: s.BlobStore.Name(), metrics.LabelCacheType: s.Name(),
metrics.LabelComponent: s.component, metrics.LabelComponent: s.component,
metrics.LabelSource: "origin", metrics.LabelSource: "origin",
}).Set(rate) }).Set(rate)