lbry.go/dht/transaction_manager.go

124 lines
3 KiB
Go
Raw Normal View History

package dht
import (
2018-03-29 03:05:27 +02:00
"context"
"net"
2018-04-05 17:35:57 +02:00
"strings"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
2018-04-04 17:43:27 +02:00
// transaction represents a single query to the dht. it stores the queried node, the request, and the response channel
type transaction struct {
2018-03-29 03:05:27 +02:00
node Node
2018-04-04 17:43:27 +02:00
req Request
res chan Response
}
2018-04-04 17:43:27 +02:00
// transactionManager keeps track of the outstanding transactions
type transactionManager struct {
2018-04-04 17:43:27 +02:00
dht *DHT
lock *sync.RWMutex
2018-04-03 19:38:01 +02:00
transactions map[messageID]*transaction
}
2018-04-04 17:43:27 +02:00
// newTransactionManager returns a new transactionManager
func newTransactionManager(dht *DHT) *transactionManager {
return &transactionManager{
lock: &sync.RWMutex{},
2018-04-03 19:38:01 +02:00
transactions: make(map[messageID]*transaction),
dht: dht,
}
}
2018-04-04 17:43:27 +02:00
// insert adds a transaction to the manager.
func (tm *transactionManager) insert(tx *transaction) {
tm.lock.Lock()
defer tm.lock.Unlock()
2018-04-04 17:43:27 +02:00
tm.transactions[tx.req.ID] = tx
}
2018-04-04 17:43:27 +02:00
// delete removes a transaction from the manager.
2018-04-03 19:38:01 +02:00
func (tm *transactionManager) delete(id messageID) {
tm.lock.Lock()
defer tm.lock.Unlock()
2018-04-03 19:38:01 +02:00
delete(tm.transactions, id)
}
2018-04-04 17:43:27 +02:00
// Find finds a transaction for the given id. it optionally ensures that addr matches node from transaction
2018-04-03 19:38:01 +02:00
func (tm *transactionManager) Find(id messageID, addr *net.UDPAddr) *transaction {
tm.lock.RLock()
defer tm.lock.RUnlock()
t, ok := tm.transactions[id]
2018-04-04 17:43:27 +02:00
if !ok || (addr != nil && t.node.Addr().String() != addr.String()) {
return nil
}
return t
}
2018-04-04 17:43:27 +02:00
// SendAsync sends a transaction and returns a channel that will eventually contain the transaction response
// The response channel is closed when the transaction is completed or times out.
func (tm *transactionManager) SendAsync(ctx context.Context, node Node, req Request) <-chan *Response {
if node.id.Equals(tm.dht.node.id) {
log.Error("sending query to self")
return nil
}
2018-03-29 03:05:27 +02:00
ch := make(chan *Response, 1)
2018-03-29 03:05:27 +02:00
go func() {
defer close(ch)
2018-03-29 03:05:27 +02:00
req.ID = newMessageID()
2018-04-03 19:38:01 +02:00
req.NodeID = tm.dht.node.id
2018-04-04 17:43:27 +02:00
tx := &transaction{
2018-03-29 03:05:27 +02:00
node: node,
req: req,
2018-04-04 17:43:27 +02:00
res: make(chan Response),
}
2018-04-04 17:43:27 +02:00
tm.insert(tx)
defer tm.delete(tx.req.ID)
2018-03-29 03:05:27 +02:00
for i := 0; i < udpRetry; i++ {
2018-04-04 17:43:27 +02:00
if err := send(tm.dht, node.Addr(), tx.req); err != nil {
2018-04-05 17:35:57 +02:00
if !strings.Contains(err.Error(), "use of closed network connection") { // this only happens on localhost. real UDP has no connections
log.Error("send error: ", err)
}
2018-03-29 03:05:27 +02:00
continue // try again? return?
}
select {
2018-04-04 17:43:27 +02:00
case res := <-tx.res:
ch <- &res
2018-03-29 03:05:27 +02:00
return
case <-ctx.Done():
return
case <-time.After(udpTimeout):
}
}
2018-03-29 03:05:27 +02:00
// if request timed out each time
2018-04-04 17:43:27 +02:00
tm.dht.rt.RemoveByID(tx.node.id)
2018-03-29 03:05:27 +02:00
}()
return ch
}
2018-04-04 17:43:27 +02:00
// Send sends a transaction and blocks until the response is available. It returns a response, or nil
// if the transaction timed out.
func (tm *transactionManager) Send(node Node, req Request) *Response {
2018-03-29 03:05:27 +02:00
return <-tm.SendAsync(context.Background(), node, req)
}
// Count returns the number of transactions in the manager
func (tm *transactionManager) Count() int {
tm.lock.Lock()
defer tm.lock.Unlock()
return len(tm.transactions)
}