2015-08-20 11:27:15 -04:00
import binascii
2016-11-03 14:42:45 -05:00
import logging
2017-10-23 15:36:50 -04:00
from twisted.internet import defer, task
2017-02-16 16:26:20 -05:00
from lbrynet.core import utils
2018-03-27 16:07:55 -04:00
from lbrynet import conf
2016-11-03 14:42:45 -05:00
log = logging.getLogger(__name__)
2015-08-20 11:27:15 -04:00
2018-03-27 15:12:44 -04:00
class DHTHashAnnouncer(object):
2018-03-27 16:07:55 -04:00
def __init__(self, dht_node, storage, concurrent_announcers=None):
2015-08-20 11:27:15 -04:00
self.dht_node = dht_node
2018-03-27 15:12:44 -04:00
self.storage = storage
self.clock = dht_node.clock
2018-02-20 13:33:59 -05:00
self.peer_port = dht_node.peerPort
2018-03-27 15:12:44 -04:00
self.hash_queue = []
2018-03-27 16:07:55 -04:00
self.concurrent_announcers = concurrent_announcers or conf.settings['concurrent_announcers']
2018-03-27 15:12:44 -04:00
self._manage_lc = task.LoopingCall(self.manage)
self._manage_lc.clock = self.clock
2015-08-20 11:27:15 -04:00
2018-03-27 15:12:44 -04:00
def start(self):
2015-08-20 11:27:15 -04:00
def stop(self):
2018-03-27 15:12:44 -04:00
if self._manage_lc.running:
2015-08-20 11:27:15 -04:00
2017-10-23 15:36:50 -04:00
2018-03-27 15:12:44 -04:00
def do_store(self, blob_hash):
storing_node_ids = yield self.dht_node.announceHaveBlob(binascii.unhexlify(blob_hash))
now = self.clock.seconds()
if storing_node_ids:
result = (now, storing_node_ids)
yield self.storage.update_last_announced_blob(blob_hash, now)
log.debug("Stored %s to %i peers", blob_hash[:16], len(storing_node_ids))
result = (None, [])
2017-11-06 19:48:47 -05:00
2018-03-27 15:12:44 -04:00
def _show_announce_progress(self, size, start):
queue_size = len(self.hash_queue)
average_blobs_per_second = float(size - queue_size) / (self.clock.seconds() - start)
log.info("Announced %i/%i blobs, %f blobs per second", size - queue_size, size, average_blobs_per_second)
2015-08-20 11:27:15 -04:00
2018-02-20 13:33:59 -05:00
2018-03-27 15:12:44 -04:00
def immediate_announce(self, blob_hashes):
2018-03-28 18:47:37 -04:00
self.hash_queue.extend(b for b in blob_hashes if b not in self.hash_queue)
2018-03-27 15:12:44 -04:00
log.info("Announcing %i blobs", len(self.hash_queue))
start = self.clock.seconds()
progress_lc = task.LoopingCall(self._show_announce_progress, len(self.hash_queue), start)
progress_lc.start(60, now=False)
s = defer.DeferredSemaphore(self.concurrent_announcers)
results = yield utils.DeferredDict({blob_hash: s.run(self.do_store, blob_hash) for blob_hash in blob_hashes})
now = self.clock.seconds()
announced_to = [blob_hash for blob_hash in results if results[blob_hash][0]]
if len(announced_to) != len(results):
log.debug("Failed to announce %i blobs", len(results) - len(announced_to))
if announced_to:
log.info('Took %s seconds to announce %i of %i attempted hashes (%f hashes per second)',
2018-03-27 21:22:53 -04:00
now - start, len(announced_to), len(blob_hashes),
2018-03-27 15:12:44 -04:00
int(float(len(blob_hashes)) / float(now - start)))
2015-08-20 11:27:15 -04:00
2018-02-20 13:33:59 -05:00
2018-03-27 15:12:44 -04:00
def manage(self):
need_reannouncement = yield self.storage.get_blobs_to_announce()
if need_reannouncement:
yield self.immediate_announce(need_reannouncement)
log.debug("Nothing to announce")