diff --git a/locks/multiplelock.go b/locks/multiplelock.go new file mode 100644 index 0000000..c87b645 --- /dev/null +++ b/locks/multiplelock.go @@ -0,0 +1,131 @@ +package locks + +import ( + "fmt" + "sync" + "time" +) + +// MultipleLock is the main interface for multiLock based on key +type MultipleLock interface { + Lock(key string) + RLock(key string) + Unlock(key string) + RUnlock(key string) +} + +func NewMultipleLock() MultipleLock { + return &multiLock{ + locks: make(map[string]*itemLock), + mu: sync.Mutex{}, + } +} + +type itemLock struct { + lk *sync.RWMutex + cnt int64 +} + +// multiLock is an optimized locking system per locking key +type multiLock struct { + locks map[string]*itemLock + mu sync.Mutex // synchronize reads/writes to locks map +} + +func debugPrint(format string, a ...interface{}) { + debugEnabled := false + if debugEnabled { + a = append(a, time.Now().Format("04:05.000000")) + fmt.Printf(format+" at %s\n", a...) + } +} + +func (ml *multiLock) Lock(key string) { + debugPrint("mutex requested %s", key) + ml.mu.Lock() + debugPrint("mutex acquired %s", key) + + itmLock, exists := ml.locks[key] + if !exists { + debugPrint("new lock created %s", key) + itmLock = &itemLock{&sync.RWMutex{}, 0} + ml.locks[key] = itmLock + } + itmLock.cnt++ + debugPrint("releasing mutex %s", key) + ml.mu.Unlock() + + debugPrint("Lock requested %s", key) + itmLock.lk.Lock() + debugPrint("Lock acquired %s", key) +} + +func (ml *multiLock) RLock(key string) { + debugPrint("mutex requested %s", key) + ml.mu.Lock() + debugPrint("mutex acquired %s", key) + + itmLock, exists := ml.locks[key] + if !exists { + debugPrint("new lock created for %s", key) + itmLock = &itemLock{&sync.RWMutex{}, 0} + ml.locks[key] = itmLock + } + itmLock.cnt++ + debugPrint("releasing mutex %s", key) + ml.mu.Unlock() + + debugPrint("RLock requested %s", key) + itmLock.lk.RLock() + debugPrint("RLock acquired %s", key) +} + +func (ml *multiLock) Unlock(key string) { + debugPrint("mutex requested %s", key) + ml.mu.Lock() + debugPrint("mutex acquired %s", key) + + itmLock, exists := ml.locks[key] + if !exists { + panic("sync Unlock of non existent lock!!") + } + + debugPrint("Unlock %s", key) + itmLock.lk.Unlock() + itmLock.cnt-- + if itmLock.cnt == 0 { + debugPrint("delete lock %s", key) + delete(ml.locks, key) + } + if itmLock.cnt < 0 { + panic("sync Unlock of free Lock!!") + } + + debugPrint("releasing mutex %s", key) + ml.mu.Unlock() +} + +func (ml *multiLock) RUnlock(key string) { + debugPrint("mutex requested %s", key) + ml.mu.Lock() + debugPrint("mutex acquired %s", key) + + itmLock, exists := ml.locks[key] + if !exists { + panic("sync Unlock of non existent lock!!") + } + + debugPrint("RUnlock %s", key) + itmLock.lk.RUnlock() + itmLock.cnt-- + if itmLock.cnt == 0 { + debugPrint("delete lock %s", key) + delete(ml.locks, key) + } + if itmLock.cnt < 0 { + panic("sync Unlock of free Lock!!") + } + + debugPrint("releasing mutex %s", key) + ml.mu.Unlock() +} diff --git a/locks/multiplelock_test.go b/locks/multiplelock_test.go new file mode 100644 index 0000000..a7a42c7 --- /dev/null +++ b/locks/multiplelock_test.go @@ -0,0 +1,56 @@ +package locks + +import ( + "math/rand" + "strconv" + "testing" + "time" + + "github.com/lbryio/lbry.go/v2/extras/stop" +) + +var lock = NewMultipleLock() + +func TestNewMultipleLock(t *testing.T) { + grp := stop.New() + for i := 0; i < 100; i++ { + grp.Add(2) + go doRWWork(i, i%10, grp) + go doRWork(i, i%10, grp) + } + time.Sleep(5 * time.Second) + grp.StopAndWait() +} +func doRWWork(worker int, resource int, grp *stop.Group) { + for { + select { + case <-grp.Ch(): + grp.Done() + return + default: + //log.Printf("RW - worker %d doing work on resource %d\n", worker, resource) + lock.Lock(strconv.Itoa(resource)) + randomTime := time.Duration(rand.Int()%10+1) * time.Microsecond + time.Sleep(randomTime) + //log.Printf("RW - worker %d releasing %d\n", worker, resource) + lock.Unlock(strconv.Itoa(resource)) + } + } +} + +func doRWork(worker int, resource int, grp *stop.Group) { + for { + select { + case <-grp.Ch(): + grp.Done() + return + default: + //log.Printf("R - worker %d doing work on resource %d\n", worker, resource) + lock.RLock(strconv.Itoa(resource)) + randomTime := time.Duration(rand.Int()%10+1) * time.Microsecond + time.Sleep(randomTime) + //log.Printf("R - worker %d releasing %d\n", worker, resource) + lock.RUnlock(strconv.Itoa(resource)) + } + } +} diff --git a/store/disk.go b/store/disk.go index f108204..1515135 100644 --- a/store/disk.go +++ b/store/disk.go @@ -11,6 +11,7 @@ import ( "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/locks" "github.com/lbryio/reflector.go/shared" "github.com/lbryio/reflector.go/store/speedwalk" log "github.com/sirupsen/logrus" @@ -28,6 +29,7 @@ type DiskStore struct { initialized bool concurrentChecks atomic.Int32 + lock locks.MultipleLock } const maxConcurrentChecks = 3 @@ -37,6 +39,7 @@ func NewDiskStore(dir string, prefixLength int) *DiskStore { return &DiskStore{ blobDir: dir, prefixLength: prefixLength, + lock: locks.NewMultipleLock(), } } @@ -47,6 +50,8 @@ func (d *DiskStore) Name() string { return nameDisk } // Has returns T/F or Error if it the blob stored already. It will error with any IO disk error. func (d *DiskStore) Has(hash string) (bool, error) { + d.lock.RLock(hash) + defer d.lock.RUnlock(hash) err := d.initOnce() if err != nil { return false, err @@ -64,6 +69,8 @@ func (d *DiskStore) Has(hash string) (bool, error) { // Get returns the blob or an error if the blob doesn't exist. func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { + d.lock.RLock(hash) + defer d.lock.RUnlock(hash) start := time.Now() err := d.initOnce() if err != nil { @@ -101,6 +108,8 @@ func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { // Put stores the blob on disk func (d *DiskStore) Put(hash string, blob stream.Blob) error { + d.lock.Lock(hash) + defer d.lock.Unlock(hash) start := time.Now() defer func() { if time.Since(start) > 100*time.Millisecond { @@ -141,11 +150,15 @@ reading after writing: hash match: %t`, hash, matchesBeforeWriting, err == nil, // PutSD stores the sd blob on the disk func (d *DiskStore) PutSD(hash string, blob stream.Blob) error { + d.lock.Lock(hash) + defer d.lock.Unlock(hash) return d.Put(hash, blob) } // Delete deletes the blob from the store func (d *DiskStore) Delete(hash string) error { + d.lock.Lock(hash) + defer d.lock.Unlock(hash) err := d.initOnce() if err != nil { return err