diff --git a/locks/multiplelock.go b/locks/multiplelock.go deleted file mode 100644 index c87b645..0000000 --- a/locks/multiplelock.go +++ /dev/null @@ -1,131 +0,0 @@ -package locks - -import ( - "fmt" - "sync" - "time" -) - -// MultipleLock is the main interface for multiLock based on key -type MultipleLock interface { - Lock(key string) - RLock(key string) - Unlock(key string) - RUnlock(key string) -} - -func NewMultipleLock() MultipleLock { - return &multiLock{ - locks: make(map[string]*itemLock), - mu: sync.Mutex{}, - } -} - -type itemLock struct { - lk *sync.RWMutex - cnt int64 -} - -// multiLock is an optimized locking system per locking key -type multiLock struct { - locks map[string]*itemLock - mu sync.Mutex // synchronize reads/writes to locks map -} - -func debugPrint(format string, a ...interface{}) { - debugEnabled := false - if debugEnabled { - a = append(a, time.Now().Format("04:05.000000")) - fmt.Printf(format+" at %s\n", a...) - } -} - -func (ml *multiLock) Lock(key string) { - debugPrint("mutex requested %s", key) - ml.mu.Lock() - debugPrint("mutex acquired %s", key) - - itmLock, exists := ml.locks[key] - if !exists { - debugPrint("new lock created %s", key) - itmLock = &itemLock{&sync.RWMutex{}, 0} - ml.locks[key] = itmLock - } - itmLock.cnt++ - debugPrint("releasing mutex %s", key) - ml.mu.Unlock() - - debugPrint("Lock requested %s", key) - itmLock.lk.Lock() - debugPrint("Lock acquired %s", key) -} - -func (ml *multiLock) RLock(key string) { - debugPrint("mutex requested %s", key) - ml.mu.Lock() - debugPrint("mutex acquired %s", key) - - itmLock, exists := ml.locks[key] - if !exists { - debugPrint("new lock created for %s", key) - itmLock = &itemLock{&sync.RWMutex{}, 0} - ml.locks[key] = itmLock - } - itmLock.cnt++ - debugPrint("releasing mutex %s", key) - ml.mu.Unlock() - - debugPrint("RLock requested %s", key) - itmLock.lk.RLock() - debugPrint("RLock acquired %s", key) -} - -func (ml *multiLock) Unlock(key string) { - debugPrint("mutex requested %s", key) - ml.mu.Lock() - debugPrint("mutex acquired %s", key) - - itmLock, exists := ml.locks[key] - if !exists { - panic("sync Unlock of non existent lock!!") - } - - debugPrint("Unlock %s", key) - itmLock.lk.Unlock() - itmLock.cnt-- - if itmLock.cnt == 0 { - debugPrint("delete lock %s", key) - delete(ml.locks, key) - } - if itmLock.cnt < 0 { - panic("sync Unlock of free Lock!!") - } - - debugPrint("releasing mutex %s", key) - ml.mu.Unlock() -} - -func (ml *multiLock) RUnlock(key string) { - debugPrint("mutex requested %s", key) - ml.mu.Lock() - debugPrint("mutex acquired %s", key) - - itmLock, exists := ml.locks[key] - if !exists { - panic("sync Unlock of non existent lock!!") - } - - debugPrint("RUnlock %s", key) - itmLock.lk.RUnlock() - itmLock.cnt-- - if itmLock.cnt == 0 { - debugPrint("delete lock %s", key) - delete(ml.locks, key) - } - if itmLock.cnt < 0 { - panic("sync Unlock of free Lock!!") - } - - debugPrint("releasing mutex %s", key) - ml.mu.Unlock() -} diff --git a/locks/multiplelock_test.go b/locks/multiplelock_test.go deleted file mode 100644 index a7a42c7..0000000 --- a/locks/multiplelock_test.go +++ /dev/null @@ -1,56 +0,0 @@ -package locks - -import ( - "math/rand" - "strconv" - "testing" - "time" - - "github.com/lbryio/lbry.go/v2/extras/stop" -) - -var lock = NewMultipleLock() - -func TestNewMultipleLock(t *testing.T) { - grp := stop.New() - for i := 0; i < 100; i++ { - grp.Add(2) - go doRWWork(i, i%10, grp) - go doRWork(i, i%10, grp) - } - time.Sleep(5 * time.Second) - grp.StopAndWait() -} -func doRWWork(worker int, resource int, grp *stop.Group) { - for { - select { - case <-grp.Ch(): - grp.Done() - return - default: - //log.Printf("RW - worker %d doing work on resource %d\n", worker, resource) - lock.Lock(strconv.Itoa(resource)) - randomTime := time.Duration(rand.Int()%10+1) * time.Microsecond - time.Sleep(randomTime) - //log.Printf("RW - worker %d releasing %d\n", worker, resource) - lock.Unlock(strconv.Itoa(resource)) - } - } -} - -func doRWork(worker int, resource int, grp *stop.Group) { - for { - select { - case <-grp.Ch(): - grp.Done() - return - default: - //log.Printf("R - worker %d doing work on resource %d\n", worker, resource) - lock.RLock(strconv.Itoa(resource)) - randomTime := time.Duration(rand.Int()%10+1) * time.Microsecond - time.Sleep(randomTime) - //log.Printf("R - worker %d releasing %d\n", worker, resource) - lock.RUnlock(strconv.Itoa(resource)) - } - } -} diff --git a/store/disk.go b/store/disk.go index fb5e326..5ef7006 100644 --- a/store/disk.go +++ b/store/disk.go @@ -12,7 +12,6 @@ import ( "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" - "github.com/lbryio/reflector.go/locks" "github.com/lbryio/reflector.go/shared" "github.com/lbryio/reflector.go/store/speedwalk" log "github.com/sirupsen/logrus" @@ -49,7 +48,6 @@ type DiskStore struct { initialized bool concurrentChecks atomic.Int32 - lock locks.MultipleLock } const maxConcurrentChecks = 3 @@ -59,7 +57,6 @@ func NewDiskStore(dir string, prefixLength int) *DiskStore { return &DiskStore{ blobDir: dir, prefixLength: prefixLength, - lock: locks.NewMultipleLock(), } } @@ -70,8 +67,6 @@ func (d *DiskStore) Name() string { return nameDisk } // Has returns T/F or Error if it the blob stored already. It will error with any IO disk error. func (d *DiskStore) Has(hash string) (bool, error) { - d.lock.RLock(hash) - defer d.lock.RUnlock(hash) err := d.initOnce() if err != nil { return false, err @@ -89,8 +84,6 @@ func (d *DiskStore) Has(hash string) (bool, error) { // Get returns the blob or an error if the blob doesn't exist. func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { - d.lock.RLock(hash) - defer d.lock.RUnlock(hash) start := time.Now() err := d.initOnce() if err != nil { @@ -128,8 +121,6 @@ func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { // Put stores the blob on disk func (d *DiskStore) Put(hash string, blob stream.Blob) error { - d.lock.Lock(hash) - defer d.lock.Unlock(hash) err := d.initOnce() if err != nil { return err @@ -145,15 +136,11 @@ func (d *DiskStore) Put(hash string, blob stream.Blob) error { // PutSD stores the sd blob on the disk func (d *DiskStore) PutSD(hash string, blob stream.Blob) error { - d.lock.Lock(hash) - defer d.lock.Unlock(hash) return d.Put(hash, blob) } // Delete deletes the blob from the store func (d *DiskStore) Delete(hash string) error { - d.lock.Lock(hash) - defer d.lock.Unlock(hash) err := d.initOnce() if err != nil { return err diff --git a/store/http.go b/store/http.go index 8a934d7..34461de 100644 --- a/store/http.go +++ b/store/http.go @@ -94,7 +94,7 @@ func (n *HttpStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { blob := make([]byte, written) copy(blob, tmp.Bytes()) metrics.MtrInBytesHttp.Add(float64(len(blob))) - return blob, trace.Stack(time.Since(start), n.Name()), ErrBlobNotFound + return blob, trace.Stack(time.Since(start), n.Name()), nil } var body []byte if res.Body != nil {