add support for should_announce feature

This commit is contained in:
Kay Kurokawa 2017-08-02 12:11:41 -04:00
parent 7e95169fbe
commit 0639bb9865
6 changed files with 47 additions and 16 deletions

View file

@ -251,6 +251,7 @@ ADJUSTABLE_SETTINGS = {
'download_directory': (str, default_download_dir), 'download_directory': (str, default_download_dir),
'download_timeout': (int, 180), 'download_timeout': (int, 180),
'is_generous_host': (bool, True), 'is_generous_host': (bool, True),
'announce_head_blobs_only': (bool, False),
'known_dht_nodes': (list, DEFAULT_DHT_NODES, server_port), 'known_dht_nodes': (list, DEFAULT_DHT_NODES, server_port),
'lbryum_wallet_dir': (str, default_lbryum_dir), 'lbryum_wallet_dir': (str, default_lbryum_dir),
'max_connections_per_stream': (int, 5), 'max_connections_per_stream': (int, 5),

View file

@ -5,6 +5,7 @@ import sqlite3
from twisted.internet import threads, defer, reactor from twisted.internet import threads, defer, reactor
from twisted.enterprise import adbapi from twisted.enterprise import adbapi
from lbrynet import conf
from lbrynet.core.HashBlob import BlobFile, BlobFileCreator from lbrynet.core.HashBlob import BlobFile, BlobFileCreator
from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier
from lbrynet.core.sqlite_helpers import rerun_if_locked from lbrynet.core.sqlite_helpers import rerun_if_locked
@ -13,9 +14,18 @@ log = logging.getLogger(__name__)
class DiskBlobManager(DHTHashSupplier): class DiskBlobManager(DHTHashSupplier):
"""This class stores blobs on the hard disk"""
def __init__(self, hash_announcer, blob_dir, db_dir): def __init__(self, hash_announcer, blob_dir, db_dir):
"""
This class stores blobs on the hard disk,
blob_dir - directory where blobs are stored
db_dir - directory where sqlite database of blob information is stored
"""
DHTHashSupplier.__init__(self, hash_announcer) DHTHashSupplier.__init__(self, hash_announcer)
self.announce_head_blobs_only = conf.settings['announce_head_blobs_only']
self.blob_dir = blob_dir self.blob_dir = blob_dir
self.db_file = os.path.join(db_dir, "blobs.db") self.db_file = os.path.join(db_dir, "blobs.db")
self.db_conn = adbapi.ConnectionPool('sqlite3', self.db_file, check_same_thread=False) self.db_conn = adbapi.ConnectionPool('sqlite3', self.db_file, check_same_thread=False)
@ -61,11 +71,15 @@ class DiskBlobManager(DHTHashSupplier):
raise Exception("Hash announcer not set") raise Exception("Hash announcer not set")
@defer.inlineCallbacks @defer.inlineCallbacks
def blob_completed(self, blob, next_announce_time=None): def blob_completed(self, blob, next_announce_time=None, should_announce=False):
if next_announce_time is None: if next_announce_time is None:
next_announce_time = self.get_next_announce_time() next_announce_time = self.get_next_announce_time()
yield self._add_completed_blob(blob.blob_hash, blob.length, next_announce_time) yield self._add_completed_blob(blob.blob_hash, blob.length,
reactor.callLater(0, self._immediate_announce, [blob.blob_hash]) next_announce_time, should_announce)
# we announce all blobs immediately, if announce_head_blob_only is False
# otherwise, announce only if marked as should_announce
if not self.announce_head_blobs_only or should_announce:
reactor.callLater(0, self._immediate_announce, [blob.blob_hash])
def completed_blobs(self, blobhashes_to_check): def completed_blobs(self, blobhashes_to_check):
return self._completed_blobs(blobhashes_to_check) return self._completed_blobs(blobhashes_to_check)
@ -73,16 +87,15 @@ class DiskBlobManager(DHTHashSupplier):
def hashes_to_announce(self): def hashes_to_announce(self):
return self._get_blobs_to_announce() return self._get_blobs_to_announce()
def creator_finished(self, blob_creator): def creator_finished(self, blob_creator, should_announce):
log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash) log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash)
assert blob_creator.blob_hash is not None assert blob_creator.blob_hash is not None
assert blob_creator.blob_hash not in self.blobs assert blob_creator.blob_hash not in self.blobs
assert blob_creator.length is not None assert blob_creator.length is not None
new_blob = self.blob_type(self.blob_dir, blob_creator.blob_hash, blob_creator.length) new_blob = self.blob_type(self.blob_dir, blob_creator.blob_hash, blob_creator.length)
self.blobs[blob_creator.blob_hash] = new_blob self.blobs[blob_creator.blob_hash] = new_blob
self._immediate_announce([blob_creator.blob_hash])
next_announce_time = self.get_next_announce_time() next_announce_time = self.get_next_announce_time()
d = self.blob_completed(new_blob, next_announce_time) d = self.blob_completed(new_blob, next_announce_time, should_announce)
return d return d
def immediate_announce_all_blobs(self): def immediate_announce_all_blobs(self):
@ -150,11 +163,13 @@ class DiskBlobManager(DHTHashSupplier):
return self.db_conn.runInteraction(create_tables) return self.db_conn.runInteraction(create_tables)
@rerun_if_locked @rerun_if_locked
def _add_completed_blob(self, blob_hash, length, next_announce_time): def _add_completed_blob(self, blob_hash, length, next_announce_time, should_announce):
log.debug("Adding a completed blob. blob_hash=%s, length=%s", blob_hash, str(length)) log.debug("Adding a completed blob. blob_hash=%s, length=%s", blob_hash, str(length))
should_announce = 1 if should_announce else 0
d = self.db_conn.runQuery( d = self.db_conn.runQuery(
"insert into blobs (blob_hash, blob_length, next_announce_time) values (?, ?, ?)", "insert into blobs (blob_hash, blob_length, next_announce_time, should_announce) "+
(blob_hash, length, next_announce_time) "values (?, ?, ?, ?)",
(blob_hash, length, next_announce_time, should_announce)
) )
d.addErrback(lambda err: err.trap(sqlite3.IntegrityError)) d.addErrback(lambda err: err.trap(sqlite3.IntegrityError))
return d return d
@ -176,9 +191,16 @@ class DiskBlobManager(DHTHashSupplier):
def get_and_update(transaction): def get_and_update(transaction):
timestamp = time.time() timestamp = time.time()
r = transaction.execute("select blob_hash from blobs " + if self.announce_head_blobs_only is True:
r = transaction.execute("select blob_hash from blobs " +
"where next_announce_time < ? and blob_hash is not null "+
"and should_announce = 1",
(timestamp,))
else:
r = transaction.execute("select blob_hash from blobs " +
"where next_announce_time < ? and blob_hash is not null", "where next_announce_time < ? and blob_hash is not null",
(timestamp,)) (timestamp,))
blobs = [b for b, in r.fetchall()] blobs = [b for b, in r.fetchall()]
next_announce_time = self.get_next_announce_time(len(blobs)) next_announce_time = self.get_next_announce_time(len(blobs))
transaction.execute( transaction.execute(

View file

@ -108,7 +108,7 @@ class BlobStreamDescriptorWriter(StreamDescriptorWriter):
blob_creator.write(raw_data) blob_creator.write(raw_data)
log.debug("Wrote the data to the new blob") log.debug("Wrote the data to the new blob")
sd_hash = yield blob_creator.close() sd_hash = yield blob_creator.close()
yield self.blob_manager.creator_finished(blob_creator) yield self.blob_manager.creator_finished(blob_creator, should_announce=True)
defer.returnValue(sd_hash) defer.returnValue(sd_hash)

View file

@ -79,7 +79,9 @@ class BlobRequester(object):
def _send_next_request(self, peer, protocol): def _send_next_request(self, peer, protocol):
log.debug('Sending a blob request for %s and %s', peer, protocol) log.debug('Sending a blob request for %s and %s', peer, protocol)
availability = AvailabilityRequest(self, peer, protocol, self.payment_rate_manager) availability = AvailabilityRequest(self, peer, protocol, self.payment_rate_manager)
download = DownloadRequest(self, peer, protocol, self.payment_rate_manager, self.wallet) head_blob_hash = self._download_manager.get_head_blob_hash()
download = DownloadRequest(self, peer, protocol, self.payment_rate_manager,
self.wallet, head_blob_hash)
price = PriceRequest(self, peer, protocol, self.payment_rate_manager) price = PriceRequest(self, peer, protocol, self.payment_rate_manager)
sent_request = False sent_request = False
@ -406,9 +408,10 @@ class PriceRequest(RequestHelper):
class DownloadRequest(RequestHelper): class DownloadRequest(RequestHelper):
"""Choose a blob and download it from a peer and also pay the peer for the data.""" """Choose a blob and download it from a peer and also pay the peer for the data."""
def __init__(self, requester, peer, protocol, payment_rate_manager, wallet): def __init__(self, requester, peer, protocol, payment_rate_manager, wallet, head_blob_hash):
RequestHelper.__init__(self, requester, peer, protocol, payment_rate_manager) RequestHelper.__init__(self, requester, peer, protocol, payment_rate_manager)
self.wallet = wallet self.wallet = wallet
self.head_blob_hash = head_blob_hash
def can_make_request(self): def can_make_request(self):
if self.protocol in self.protocol_prices: if self.protocol in self.protocol_prices:
@ -546,7 +549,8 @@ class DownloadRequest(RequestHelper):
self.update_local_score(5.0) self.update_local_score(5.0)
self.peer.update_stats('blobs_downloaded', 1) self.peer.update_stats('blobs_downloaded', 1)
self.peer.update_score(5.0) self.peer.update_score(5.0)
self.requestor.blob_manager.blob_completed(blob) should_announce = blob.blob_hash == self.head_blob_hash
self.requestor.blob_manager.blob_completed(blob, should_announce=should_announce)
return arg return arg
def _download_failed(self, reason): def _download_failed(self, reason):

View file

@ -91,9 +91,11 @@ class CryptStreamCreator(StreamCreator):
done, num_bytes_written = self.current_blob.write(data) done, num_bytes_written = self.current_blob.write(data)
data = data[num_bytes_written:] data = data[num_bytes_written:]
if done is True: if done is True:
should_announce = self.blob_count == 0
d = self.current_blob.close() d = self.current_blob.close()
d.addCallback(self._blob_finished) d.addCallback(self._blob_finished)
d.addCallback(lambda _: self.blob_manager.creator_finished(self.next_blob_creator)) d.addCallback(lambda _: self.blob_manager.creator_finished(
self.next_blob_creator, should_announce))
self.finished_deferreds.append(d) self.finished_deferreds.append(d)
self.current_blob = None self.current_blob = None

View file

@ -9,6 +9,7 @@ from tests.util import random_lbry_hash
from lbrynet.core.BlobManager import DiskBlobManager from lbrynet.core.BlobManager import DiskBlobManager
from lbrynet.core.HashAnnouncer import DummyHashAnnouncer from lbrynet.core.HashAnnouncer import DummyHashAnnouncer
from lbrynet.core.Peer import Peer from lbrynet.core.Peer import Peer
from lbrynet import conf
from lbrynet.core.cryptoutils import get_lbry_hash_obj from lbrynet.core.cryptoutils import get_lbry_hash_obj
from twisted.trial import unittest from twisted.trial import unittest
@ -16,6 +17,7 @@ from twisted.internet import defer
class BlobManagerTest(unittest.TestCase): class BlobManagerTest(unittest.TestCase):
def setUp(self): def setUp(self):
conf.initialize_settings()
self.blob_dir = tempfile.mkdtemp() self.blob_dir = tempfile.mkdtemp()
self.db_dir = tempfile.mkdtemp() self.db_dir = tempfile.mkdtemp()
hash_announcer = DummyHashAnnouncer() hash_announcer = DummyHashAnnouncer()