add token cache
This commit is contained in:
parent
766f4f101d
commit
f61ea53c8c
2 changed files with 76 additions and 21 deletions
29
dht/dht.go
29
dht/dht.go
|
@ -94,6 +94,8 @@ type DHT struct {
|
|||
lock *sync.RWMutex
|
||||
// list of bitmaps that need to be reannounced periodically
|
||||
announced map[bits.Bitmap]bool
|
||||
// cache for store tokens
|
||||
tokenCache *tokenCache
|
||||
}
|
||||
|
||||
// New returns a DHT pointer. If config is nil, then config will be set to the default config.
|
||||
|
@ -120,6 +122,7 @@ func (dht *DHT) connect(conn UDPConn) error {
|
|||
|
||||
dht.contact = contact
|
||||
dht.node = NewNode(contact.ID)
|
||||
dht.tokenCache = newTokenCache(dht.node, tokenSecretRotationInterval)
|
||||
|
||||
err = dht.node.Connect(conn)
|
||||
if err != nil {
|
||||
|
@ -142,7 +145,7 @@ func (dht *DHT) Start() error {
|
|||
}
|
||||
|
||||
dht.join()
|
||||
log.Debugf("[%s] DHT ready on %s (%d nodes found during join)",
|
||||
log.Infof("[%s] DHT ready on %s (%d nodes found during join)",
|
||||
dht.node.id.HexShort(), dht.contact.Addr().String(), dht.node.rt.Count())
|
||||
|
||||
go dht.startReannouncer()
|
||||
|
@ -154,7 +157,7 @@ func (dht *DHT) Start() error {
|
|||
func (dht *DHT) join() {
|
||||
defer close(dht.joined) // if anyone's waiting for join to finish, they'll know its done
|
||||
|
||||
log.Debugf("[%s] joining network", dht.node.id.HexShort())
|
||||
log.Infof("[%s] joining DHT network", dht.node.id.HexShort())
|
||||
|
||||
// ping nodes, which gets their real node IDs and adds them to the routing table
|
||||
atLeastOneNodeResponded := false
|
||||
|
@ -312,30 +315,14 @@ func (dht *DHT) storeOnNode(hash bits.Bitmap, c Contact) {
|
|||
return
|
||||
}
|
||||
|
||||
token := dht.tokenCache.Get(c, hash, dht.stop.Ch())
|
||||
|
||||
resCh, cancel := dht.node.SendCancelable(c, Request{
|
||||
Method: findValueMethod,
|
||||
Arg: &hash,
|
||||
})
|
||||
|
||||
var res *Response
|
||||
|
||||
select {
|
||||
case res = <-resCh:
|
||||
case <-dht.stop.Ch():
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
||||
if res == nil {
|
||||
return // request timed out
|
||||
}
|
||||
|
||||
resCh, cancel = dht.node.SendCancelable(c, Request{
|
||||
Method: storeMethod,
|
||||
StoreArgs: &storeArgs{
|
||||
BlobHash: hash,
|
||||
Value: storeArgsValue{
|
||||
Token: res.Token,
|
||||
Token: token,
|
||||
LbryID: dht.contact.ID,
|
||||
Port: dht.conf.PeerProtocolPort,
|
||||
},
|
||||
|
|
68
dht/token_cache.go
Normal file
68
dht/token_cache.go
Normal file
|
@ -0,0 +1,68 @@
|
|||
package dht
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/reflector.go/dht/bits"
|
||||
|
||||
"github.com/lbryio/lbry.go/stopOnce"
|
||||
)
|
||||
|
||||
type tokenCacheEntry struct {
|
||||
token string
|
||||
receivedAt time.Time
|
||||
}
|
||||
|
||||
type tokenCache struct {
|
||||
node *Node
|
||||
tokens map[string]tokenCacheEntry
|
||||
expiration time.Duration
|
||||
lock *sync.RWMutex
|
||||
}
|
||||
|
||||
func newTokenCache(node *Node, expiration time.Duration) *tokenCache {
|
||||
tc := &tokenCache{}
|
||||
tc.node = node
|
||||
tc.tokens = make(map[string]tokenCacheEntry)
|
||||
tc.expiration = expiration
|
||||
tc.lock = &sync.RWMutex{}
|
||||
return tc
|
||||
}
|
||||
|
||||
func (tc *tokenCache) Get(c Contact, hash bits.Bitmap, cancelCh stopOnce.Chan) string {
|
||||
tc.lock.RLock()
|
||||
token, exists := tc.tokens[c.String()]
|
||||
tc.lock.RUnlock()
|
||||
|
||||
if exists && time.Since(token.receivedAt) < tc.expiration {
|
||||
return token.token
|
||||
}
|
||||
|
||||
resCh, cancel := tc.node.SendCancelable(c, Request{
|
||||
Method: findValueMethod,
|
||||
Arg: &hash,
|
||||
})
|
||||
|
||||
var res *Response
|
||||
|
||||
select {
|
||||
case res = <-resCh:
|
||||
case <-cancelCh:
|
||||
cancel()
|
||||
return ""
|
||||
}
|
||||
|
||||
if res == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
tc.lock.Lock()
|
||||
tc.tokens[c.String()] = tokenCacheEntry{
|
||||
token: res.Token,
|
||||
receivedAt: time.Now(),
|
||||
}
|
||||
tc.lock.Unlock()
|
||||
|
||||
return res.Token
|
||||
}
|
Loading…
Reference in a new issue