From d41cbbd8179fd5dec9c9b91f64ef193f4397f1a4 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Thu, 21 Jun 2018 13:40:22 -0400 Subject: [PATCH] add token cache --- dht/dht.go | 29 ++++++-------------- dht/token_cache.go | 68 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 21 deletions(-) create mode 100644 dht/token_cache.go diff --git a/dht/dht.go b/dht/dht.go index eb7a9be..a9f98c8 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -94,6 +94,8 @@ type DHT struct { lock *sync.RWMutex // list of bitmaps that need to be reannounced periodically announced map[bits.Bitmap]bool + // cache for store tokens + tokenCache *tokenCache } // New returns a DHT pointer. If config is nil, then config will be set to the default config. @@ -120,6 +122,7 @@ func (dht *DHT) connect(conn UDPConn) error { dht.contact = contact dht.node = NewNode(contact.ID) + dht.tokenCache = newTokenCache(dht.node, tokenSecretRotationInterval) err = dht.node.Connect(conn) if err != nil { @@ -142,7 +145,7 @@ func (dht *DHT) Start() error { } dht.join() - log.Debugf("[%s] DHT ready on %s (%d nodes found during join)", + log.Infof("[%s] DHT ready on %s (%d nodes found during join)", dht.node.id.HexShort(), dht.contact.Addr().String(), dht.node.rt.Count()) go dht.startReannouncer() @@ -154,7 +157,7 @@ func (dht *DHT) Start() error { func (dht *DHT) join() { defer close(dht.joined) // if anyone's waiting for join to finish, they'll know its done - log.Debugf("[%s] joining network", dht.node.id.HexShort()) + log.Infof("[%s] joining DHT network", dht.node.id.HexShort()) // ping nodes, which gets their real node IDs and adds them to the routing table atLeastOneNodeResponded := false @@ -312,30 +315,14 @@ func (dht *DHT) storeOnNode(hash bits.Bitmap, c Contact) { return } + token := dht.tokenCache.Get(c, hash, dht.stop.Ch()) + resCh, cancel := dht.node.SendCancelable(c, Request{ - Method: findValueMethod, - Arg: &hash, - }) - - var res *Response - - select { - case res = <-resCh: - case <-dht.stop.Ch(): - cancel() - return - } - - if res == nil { - return // request timed out - } - - resCh, cancel = dht.node.SendCancelable(c, Request{ Method: storeMethod, StoreArgs: &storeArgs{ BlobHash: hash, Value: storeArgsValue{ - Token: res.Token, + Token: token, LbryID: dht.contact.ID, Port: dht.conf.PeerProtocolPort, }, diff --git a/dht/token_cache.go b/dht/token_cache.go new file mode 100644 index 0000000..010e0db --- /dev/null +++ b/dht/token_cache.go @@ -0,0 +1,68 @@ +package dht + +import ( + "sync" + "time" + + "github.com/lbryio/reflector.go/dht/bits" + + "github.com/lbryio/lbry.go/stopOnce" +) + +type tokenCacheEntry struct { + token string + receivedAt time.Time +} + +type tokenCache struct { + node *Node + tokens map[string]tokenCacheEntry + expiration time.Duration + lock *sync.RWMutex +} + +func newTokenCache(node *Node, expiration time.Duration) *tokenCache { + tc := &tokenCache{} + tc.node = node + tc.tokens = make(map[string]tokenCacheEntry) + tc.expiration = expiration + tc.lock = &sync.RWMutex{} + return tc +} + +func (tc *tokenCache) Get(c Contact, hash bits.Bitmap, cancelCh stopOnce.Chan) string { + tc.lock.RLock() + token, exists := tc.tokens[c.String()] + tc.lock.RUnlock() + + if exists && time.Since(token.receivedAt) < tc.expiration { + return token.token + } + + resCh, cancel := tc.node.SendCancelable(c, Request{ + Method: findValueMethod, + Arg: &hash, + }) + + var res *Response + + select { + case res = <-resCh: + case <-cancelCh: + cancel() + return "" + } + + if res == nil { + return "" + } + + tc.lock.Lock() + tc.tokens[c.String()] = tokenCacheEntry{ + token: res.Token, + receivedAt: time.Now(), + } + tc.lock.Unlock() + + return res.Token +}