96 lines
3.4 KiB
Python
96 lines
3.4 KiB
Python
import logging
|
|
import random
|
|
import time
|
|
|
|
from twisted.internet import defer
|
|
from twisted.internet.task import LoopingCall
|
|
from decimal import Decimal
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class BlobAvailabilityTracker(object):
|
|
"""
|
|
Class to track peer counts for known blobs, and to discover new popular blobs
|
|
|
|
Attributes:
|
|
availability (dict): dictionary of peers for known blobs
|
|
"""
|
|
|
|
def __init__(self, blob_manager, peer_finder, dht_node):
|
|
self.availability = {}
|
|
self._last_mean_availability = Decimal(0.0)
|
|
self._blob_manager = blob_manager
|
|
self._peer_finder = peer_finder
|
|
self._dht_node = dht_node
|
|
self._check_mine = LoopingCall(self._update_mine)
|
|
|
|
def start(self):
|
|
log.info("Starting blob availability tracker.")
|
|
self._check_mine.start(600)
|
|
|
|
def stop(self):
|
|
log.info("Stopping blob availability tracker.")
|
|
if self._check_mine.running:
|
|
self._check_mine.stop()
|
|
|
|
def get_blob_availability(self, blob, timeout=None):
|
|
def _get_peer_count(peers):
|
|
have_blob = sum(1 for peer in peers if peer.is_available())
|
|
return {blob: have_blob}
|
|
|
|
d = self._peer_finder.find_peers_for_blob(blob, timeout)
|
|
d.addCallback(_get_peer_count)
|
|
return d
|
|
|
|
def get_availability_for_blobs(self, blobs, timeout=None):
|
|
dl = [self.get_blob_availability(blob, timeout) for blob in blobs if blob]
|
|
d = defer.DeferredList(dl)
|
|
d.addCallback(lambda results: [val for success, val in results if success])
|
|
return d
|
|
|
|
@property
|
|
def last_mean_availability(self):
|
|
return max(Decimal(0.01), self._last_mean_availability)
|
|
|
|
def _update_peers_for_blob(self, blob):
|
|
def _save_peer_info(blob_hash, peers):
|
|
v = {blob_hash: peers}
|
|
self.availability.update(v)
|
|
return v
|
|
|
|
d = self._peer_finder.find_peers_for_blob(blob)
|
|
d.addCallback(lambda r: [[c.host, c.port, c.is_available()] for c in r])
|
|
d.addCallback(lambda peers: _save_peer_info(blob, peers))
|
|
return d
|
|
|
|
def _update_mine(self):
|
|
def _get_peers(blobs):
|
|
dl = []
|
|
for hash in blobs:
|
|
dl.append(self._update_peers_for_blob(hash))
|
|
return defer.DeferredList(dl)
|
|
|
|
def sample(blobs):
|
|
return random.sample(blobs, min(len(blobs), 10))
|
|
|
|
start = time.time()
|
|
log.debug('==> Updating the peers for my blobs')
|
|
d = self._blob_manager.get_all_verified_blobs()
|
|
# as far as I can tell, this only is used to set _last_mean_availability
|
|
# which... seems like a very expensive operation for such little payoff.
|
|
# so taking a sample should get about the same effect as querying the entire
|
|
# list of blobs
|
|
d.addCallback(sample)
|
|
d.addCallback(_get_peers)
|
|
d.addCallback(lambda _: self._set_mean_peers())
|
|
d.addCallback(lambda _: log.debug('<== Done updating peers for my blobs. Took %s seconds',
|
|
time.time() - start))
|
|
# although unused, need to return or else the looping call
|
|
# could overrun on a previous call
|
|
return d
|
|
|
|
def _set_mean_peers(self):
|
|
num_peers = [len(self.availability[blob]) for blob in self.availability]
|
|
mean = Decimal(sum(num_peers)) / Decimal(max(1, len(num_peers)))
|
|
self._last_mean_availability = mean
|