add token cache

This commit is contained in:
Alex Grintsvayg 2018-06-21 13:40:22 -04:00
parent 7375c831fd
commit d41cbbd817
2 changed files with 76 additions and 21 deletions

View file

@ -94,6 +94,8 @@ type DHT struct {
lock *sync.RWMutex lock *sync.RWMutex
// list of bitmaps that need to be reannounced periodically // list of bitmaps that need to be reannounced periodically
announced map[bits.Bitmap]bool announced map[bits.Bitmap]bool
// cache for store tokens
tokenCache *tokenCache
} }
// New returns a DHT pointer. If config is nil, then config will be set to the default config. // New returns a DHT pointer. If config is nil, then config will be set to the default config.
@ -120,6 +122,7 @@ func (dht *DHT) connect(conn UDPConn) error {
dht.contact = contact dht.contact = contact
dht.node = NewNode(contact.ID) dht.node = NewNode(contact.ID)
dht.tokenCache = newTokenCache(dht.node, tokenSecretRotationInterval)
err = dht.node.Connect(conn) err = dht.node.Connect(conn)
if err != nil { if err != nil {
@ -142,7 +145,7 @@ func (dht *DHT) Start() error {
} }
dht.join() dht.join()
log.Debugf("[%s] DHT ready on %s (%d nodes found during join)", log.Infof("[%s] DHT ready on %s (%d nodes found during join)",
dht.node.id.HexShort(), dht.contact.Addr().String(), dht.node.rt.Count()) dht.node.id.HexShort(), dht.contact.Addr().String(), dht.node.rt.Count())
go dht.startReannouncer() go dht.startReannouncer()
@ -154,7 +157,7 @@ func (dht *DHT) Start() error {
func (dht *DHT) join() { func (dht *DHT) join() {
defer close(dht.joined) // if anyone's waiting for join to finish, they'll know its done defer close(dht.joined) // if anyone's waiting for join to finish, they'll know its done
log.Debugf("[%s] joining network", dht.node.id.HexShort()) log.Infof("[%s] joining DHT network", dht.node.id.HexShort())
// ping nodes, which gets their real node IDs and adds them to the routing table // ping nodes, which gets their real node IDs and adds them to the routing table
atLeastOneNodeResponded := false atLeastOneNodeResponded := false
@ -312,30 +315,14 @@ func (dht *DHT) storeOnNode(hash bits.Bitmap, c Contact) {
return return
} }
token := dht.tokenCache.Get(c, hash, dht.stop.Ch())
resCh, cancel := dht.node.SendCancelable(c, Request{ resCh, cancel := dht.node.SendCancelable(c, Request{
Method: findValueMethod,
Arg: &hash,
})
var res *Response
select {
case res = <-resCh:
case <-dht.stop.Ch():
cancel()
return
}
if res == nil {
return // request timed out
}
resCh, cancel = dht.node.SendCancelable(c, Request{
Method: storeMethod, Method: storeMethod,
StoreArgs: &storeArgs{ StoreArgs: &storeArgs{
BlobHash: hash, BlobHash: hash,
Value: storeArgsValue{ Value: storeArgsValue{
Token: res.Token, Token: token,
LbryID: dht.contact.ID, LbryID: dht.contact.ID,
Port: dht.conf.PeerProtocolPort, Port: dht.conf.PeerProtocolPort,
}, },

68
dht/token_cache.go Normal file
View file

@ -0,0 +1,68 @@
package dht
import (
"sync"
"time"
"github.com/lbryio/reflector.go/dht/bits"
"github.com/lbryio/lbry.go/stopOnce"
)
type tokenCacheEntry struct {
token string
receivedAt time.Time
}
type tokenCache struct {
node *Node
tokens map[string]tokenCacheEntry
expiration time.Duration
lock *sync.RWMutex
}
func newTokenCache(node *Node, expiration time.Duration) *tokenCache {
tc := &tokenCache{}
tc.node = node
tc.tokens = make(map[string]tokenCacheEntry)
tc.expiration = expiration
tc.lock = &sync.RWMutex{}
return tc
}
func (tc *tokenCache) Get(c Contact, hash bits.Bitmap, cancelCh stopOnce.Chan) string {
tc.lock.RLock()
token, exists := tc.tokens[c.String()]
tc.lock.RUnlock()
if exists && time.Since(token.receivedAt) < tc.expiration {
return token.token
}
resCh, cancel := tc.node.SendCancelable(c, Request{
Method: findValueMethod,
Arg: &hash,
})
var res *Response
select {
case res = <-resCh:
case <-cancelCh:
cancel()
return ""
}
if res == nil {
return ""
}
tc.lock.Lock()
tc.tokens[c.String()] = tokenCacheEntry{
token: res.Token,
receivedAt: time.Now(),
}
tc.lock.Unlock()
return res.Token
}