Merge pull request #824 from lbryio/seeders_announce_rework

Add option for seeders to announce head blob only
This commit is contained in:
Umpei Kay Kurokawa 2017-08-28 07:45:59 -07:00 committed by GitHub
commit 33843b3a19
13 changed files with 167 additions and 34 deletions

View file

@ -12,6 +12,10 @@ at anytime.
* *
* *
### Added
* Added option to announce head blob only if seeding
*
### Fixed ### Fixed
* *
* *

View file

@ -251,6 +251,7 @@ ADJUSTABLE_SETTINGS = {
'download_directory': (str, default_download_dir), 'download_directory': (str, default_download_dir),
'download_timeout': (int, 180), 'download_timeout': (int, 180),
'is_generous_host': (bool, True), 'is_generous_host': (bool, True),
'announce_head_blobs_only': (bool, False),
'known_dht_nodes': (list, DEFAULT_DHT_NODES, server_port), 'known_dht_nodes': (list, DEFAULT_DHT_NODES, server_port),
'lbryum_wallet_dir': (str, default_lbryum_dir), 'lbryum_wallet_dir': (str, default_lbryum_dir),
'max_connections_per_stream': (int, 5), 'max_connections_per_stream': (int, 5),

View file

@ -5,6 +5,7 @@ import sqlite3
from twisted.internet import threads, defer, reactor from twisted.internet import threads, defer, reactor
from twisted.enterprise import adbapi from twisted.enterprise import adbapi
from lbrynet import conf
from lbrynet.core.HashBlob import BlobFile, BlobFileCreator from lbrynet.core.HashBlob import BlobFile, BlobFileCreator
from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier
from lbrynet.core.sqlite_helpers import rerun_if_locked from lbrynet.core.sqlite_helpers import rerun_if_locked
@ -13,9 +14,18 @@ log = logging.getLogger(__name__)
class DiskBlobManager(DHTHashSupplier): class DiskBlobManager(DHTHashSupplier):
"""This class stores blobs on the hard disk"""
def __init__(self, hash_announcer, blob_dir, db_dir): def __init__(self, hash_announcer, blob_dir, db_dir):
"""
This class stores blobs on the hard disk,
blob_dir - directory where blobs are stored
db_dir - directory where sqlite database of blob information is stored
"""
DHTHashSupplier.__init__(self, hash_announcer) DHTHashSupplier.__init__(self, hash_announcer)
self.announce_head_blobs_only = conf.settings['announce_head_blobs_only']
self.blob_dir = blob_dir self.blob_dir = blob_dir
self.db_file = os.path.join(db_dir, "blobs.db") self.db_file = os.path.join(db_dir, "blobs.db")
self.db_conn = adbapi.ConnectionPool('sqlite3', self.db_file, check_same_thread=False) self.db_conn = adbapi.ConnectionPool('sqlite3', self.db_file, check_same_thread=False)
@ -47,7 +57,7 @@ class DiskBlobManager(DHTHashSupplier):
return self._make_new_blob(blob_hash, length) return self._make_new_blob(blob_hash, length)
def get_blob_creator(self): def get_blob_creator(self):
return self.blob_creator_type(self, self.blob_dir) return self.blob_creator_type(self.blob_dir)
def _make_new_blob(self, blob_hash, length=None): def _make_new_blob(self, blob_hash, length=None):
log.debug('Making a new blob for %s', blob_hash) log.debug('Making a new blob for %s', blob_hash)
@ -61,10 +71,14 @@ class DiskBlobManager(DHTHashSupplier):
raise Exception("Hash announcer not set") raise Exception("Hash announcer not set")
@defer.inlineCallbacks @defer.inlineCallbacks
def blob_completed(self, blob, next_announce_time=None): def blob_completed(self, blob, next_announce_time=None, should_announce=True):
if next_announce_time is None: if next_announce_time is None:
next_announce_time = self.get_next_announce_time() next_announce_time = self.get_next_announce_time()
yield self._add_completed_blob(blob.blob_hash, blob.length, next_announce_time) yield self._add_completed_blob(blob.blob_hash, blob.length,
next_announce_time, should_announce)
# we announce all blobs immediately, if announce_head_blob_only is False
# otherwise, announce only if marked as should_announce
if not self.announce_head_blobs_only or should_announce:
reactor.callLater(0, self._immediate_announce, [blob.blob_hash]) reactor.callLater(0, self._immediate_announce, [blob.blob_hash])
def completed_blobs(self, blobhashes_to_check): def completed_blobs(self, blobhashes_to_check):
@ -73,16 +87,15 @@ class DiskBlobManager(DHTHashSupplier):
def hashes_to_announce(self): def hashes_to_announce(self):
return self._get_blobs_to_announce() return self._get_blobs_to_announce()
def creator_finished(self, blob_creator): def creator_finished(self, blob_creator, should_announce):
log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash) log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash)
assert blob_creator.blob_hash is not None assert blob_creator.blob_hash is not None
assert blob_creator.blob_hash not in self.blobs assert blob_creator.blob_hash not in self.blobs
assert blob_creator.length is not None assert blob_creator.length is not None
new_blob = self.blob_type(self.blob_dir, blob_creator.blob_hash, blob_creator.length) new_blob = self.blob_type(self.blob_dir, blob_creator.blob_hash, blob_creator.length)
self.blobs[blob_creator.blob_hash] = new_blob self.blobs[blob_creator.blob_hash] = new_blob
self._immediate_announce([blob_creator.blob_hash])
next_announce_time = self.get_next_announce_time() next_announce_time = self.get_next_announce_time()
d = self.blob_completed(new_blob, next_announce_time) d = self.blob_completed(new_blob, next_announce_time, should_announce)
return d return d
def immediate_announce_all_blobs(self): def immediate_announce_all_blobs(self):
@ -128,7 +141,9 @@ class DiskBlobManager(DHTHashSupplier):
" blob_hash text primary key, " + " blob_hash text primary key, " +
" blob_length integer, " + " blob_length integer, " +
" last_verified_time real, " + " last_verified_time real, " +
" next_announce_time real)") " next_announce_time real, " +
" should_announce integer)")
transaction.execute("create table if not exists download (" + transaction.execute("create table if not exists download (" +
" id integer primary key autoincrement, " + " id integer primary key autoincrement, " +
@ -147,11 +162,13 @@ class DiskBlobManager(DHTHashSupplier):
return self.db_conn.runInteraction(create_tables) return self.db_conn.runInteraction(create_tables)
@rerun_if_locked @rerun_if_locked
def _add_completed_blob(self, blob_hash, length, next_announce_time): def _add_completed_blob(self, blob_hash, length, next_announce_time, should_announce):
log.debug("Adding a completed blob. blob_hash=%s, length=%s", blob_hash, str(length)) log.debug("Adding a completed blob. blob_hash=%s, length=%s", blob_hash, str(length))
should_announce = 1 if should_announce else 0
d = self.db_conn.runQuery( d = self.db_conn.runQuery(
"insert into blobs (blob_hash, blob_length, next_announce_time) values (?, ?, ?)", "insert into blobs (blob_hash, blob_length, next_announce_time, should_announce) "+
(blob_hash, length, next_announce_time) "values (?, ?, ?, ?)",
(blob_hash, length, next_announce_time, should_announce)
) )
d.addErrback(lambda err: err.trap(sqlite3.IntegrityError)) d.addErrback(lambda err: err.trap(sqlite3.IntegrityError))
return d return d
@ -173,9 +190,16 @@ class DiskBlobManager(DHTHashSupplier):
def get_and_update(transaction): def get_and_update(transaction):
timestamp = time.time() timestamp = time.time()
if self.announce_head_blobs_only is True:
r = transaction.execute("select blob_hash from blobs " +
"where next_announce_time < ? and blob_hash is not null "+
"and should_announce = 1",
(timestamp,))
else:
r = transaction.execute("select blob_hash from blobs " + r = transaction.execute("select blob_hash from blobs " +
"where next_announce_time < ? and blob_hash is not null", "where next_announce_time < ? and blob_hash is not null",
(timestamp,)) (timestamp,))
blobs = [b for b, in r.fetchall()] blobs = [b for b, in r.fetchall()]
next_announce_time = self.get_next_announce_time(len(blobs)) next_announce_time = self.get_next_announce_time(len(blobs))
transaction.execute( transaction.execute(

View file

@ -347,8 +347,7 @@ class TempBlob(HashBlob):
class HashBlobCreator(object): class HashBlobCreator(object):
def __init__(self, blob_manager): def __init__(self):
self.blob_manager = blob_manager
self._hashsum = get_lbry_hash_obj() self._hashsum = get_lbry_hash_obj()
self.len_so_far = 0 self.len_so_far = 0
self.blob_hash = None self.blob_hash = None
@ -365,7 +364,6 @@ class HashBlobCreator(object):
self.blob_hash = self._hashsum.hexdigest() self.blob_hash = self._hashsum.hexdigest()
d = self._close() d = self._close()
if self.blob_hash is not None: if self.blob_hash is not None:
d.addCallback(lambda _: self.blob_manager.creator_finished(self))
d.addCallback(lambda _: self.blob_hash) d.addCallback(lambda _: self.blob_hash)
else: else:
d.addCallback(lambda _: None) d.addCallback(lambda _: None)
@ -384,8 +382,8 @@ class HashBlobCreator(object):
class BlobFileCreator(HashBlobCreator): class BlobFileCreator(HashBlobCreator):
def __init__(self, blob_manager, blob_dir): def __init__(self, blob_dir):
HashBlobCreator.__init__(self, blob_manager) HashBlobCreator.__init__(self)
self.blob_dir = blob_dir self.blob_dir = blob_dir
self.out_file = tempfile.NamedTemporaryFile(delete=False, dir=self.blob_dir) self.out_file = tempfile.NamedTemporaryFile(delete=False, dir=self.blob_dir)
@ -403,8 +401,8 @@ class BlobFileCreator(HashBlobCreator):
class TempBlobCreator(HashBlobCreator): class TempBlobCreator(HashBlobCreator):
def __init__(self, blob_manager): def __init__(self):
HashBlobCreator.__init__(self, blob_manager) HashBlobCreator.__init__(self)
# TODO: use StringIO # TODO: use StringIO
self.data_buffer = '' self.data_buffer = ''

View file

@ -1,7 +1,7 @@
from collections import defaultdict from collections import defaultdict
import json import json
import logging import logging
from twisted.internet import threads from twisted.internet import threads, defer
from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader
from lbrynet.core.Error import UnknownStreamTypeError, InvalidStreamDescriptorError from lbrynet.core.Error import UnknownStreamTypeError, InvalidStreamDescriptorError
@ -101,12 +101,15 @@ class BlobStreamDescriptorWriter(StreamDescriptorWriter):
self.blob_manager = blob_manager self.blob_manager = blob_manager
@defer.inlineCallbacks
def _write_stream_descriptor(self, raw_data): def _write_stream_descriptor(self, raw_data):
log.debug("Creating the new blob for the stream descriptor") log.debug("Creating the new blob for the stream descriptor")
blob_creator = self.blob_manager.get_blob_creator() blob_creator = self.blob_manager.get_blob_creator()
blob_creator.write(raw_data) blob_creator.write(raw_data)
log.debug("Wrote the data to the new blob") log.debug("Wrote the data to the new blob")
return blob_creator.close() sd_hash = yield blob_creator.close()
yield self.blob_manager.creator_finished(blob_creator, should_announce=True)
defer.returnValue(sd_hash)
class StreamMetadata(object): class StreamMetadata(object):

View file

@ -79,7 +79,9 @@ class BlobRequester(object):
def _send_next_request(self, peer, protocol): def _send_next_request(self, peer, protocol):
log.debug('Sending a blob request for %s and %s', peer, protocol) log.debug('Sending a blob request for %s and %s', peer, protocol)
availability = AvailabilityRequest(self, peer, protocol, self.payment_rate_manager) availability = AvailabilityRequest(self, peer, protocol, self.payment_rate_manager)
download = DownloadRequest(self, peer, protocol, self.payment_rate_manager, self.wallet) head_blob_hash = self._download_manager.get_head_blob_hash()
download = DownloadRequest(self, peer, protocol, self.payment_rate_manager,
self.wallet, head_blob_hash)
price = PriceRequest(self, peer, protocol, self.payment_rate_manager) price = PriceRequest(self, peer, protocol, self.payment_rate_manager)
sent_request = False sent_request = False
@ -406,9 +408,10 @@ class PriceRequest(RequestHelper):
class DownloadRequest(RequestHelper): class DownloadRequest(RequestHelper):
"""Choose a blob and download it from a peer and also pay the peer for the data.""" """Choose a blob and download it from a peer and also pay the peer for the data."""
def __init__(self, requester, peer, protocol, payment_rate_manager, wallet): def __init__(self, requester, peer, protocol, payment_rate_manager, wallet, head_blob_hash):
RequestHelper.__init__(self, requester, peer, protocol, payment_rate_manager) RequestHelper.__init__(self, requester, peer, protocol, payment_rate_manager)
self.wallet = wallet self.wallet = wallet
self.head_blob_hash = head_blob_hash
def can_make_request(self): def can_make_request(self):
if self.protocol in self.protocol_prices: if self.protocol in self.protocol_prices:
@ -546,7 +549,8 @@ class DownloadRequest(RequestHelper):
self.update_local_score(5.0) self.update_local_score(5.0)
self.peer.update_stats('blobs_downloaded', 1) self.peer.update_stats('blobs_downloaded', 1)
self.peer.update_score(5.0) self.peer.update_score(5.0)
self.requestor.blob_manager.blob_completed(blob) should_announce = blob.blob_hash == self.head_blob_hash
self.requestor.blob_manager.blob_completed(blob, should_announce=should_announce)
return arg return arg
def _download_failed(self, reason): def _download_failed(self, reason):

View file

@ -98,3 +98,6 @@ class DownloadManager(object):
return self.calculate_total_bytes() return self.calculate_total_bytes()
else: else:
return sum([b.length for b in self.needed_blobs() if b.length is not None]) return sum([b.length for b in self.needed_blobs() if b.length is not None])
def get_head_blob_hash(self):
return self.blobs[0].blob_hash

View file

@ -61,18 +61,20 @@ class CryptStreamCreator(StreamCreator):
return defer.succeed(True) return defer.succeed(True)
def _finalize(self): def _finalize(self):
"""
Finalize a stream by adding an empty
blob at the end, this is to indicate that
the stream has ended. This empty blob is not
saved to the blob manager
"""
log.debug("_finalize has been called") log.debug("_finalize has been called")
self.blob_count += 1 self.blob_count += 1
iv = self.iv_generator.next() iv = self.iv_generator.next()
final_blob_creator = self.blob_manager.get_blob_creator() final_blob_creator = self.blob_manager.get_blob_creator()
log.debug("Created the finished_deferred")
final_blob = self._get_blob_maker(iv, final_blob_creator) final_blob = self._get_blob_maker(iv, final_blob_creator)
log.debug("Created the final blob")
log.debug("Calling close on final blob")
d = final_blob.close() d = final_blob.close()
d.addCallback(self._blob_finished) d.addCallback(self._blob_finished)
self.finished_deferreds.append(d) self.finished_deferreds.append(d)
log.debug("called close on final blob, returning from make_final_blob")
def _write(self, data): def _write(self, data):
def close_blob(blob): def close_blob(blob):
@ -82,14 +84,19 @@ class CryptStreamCreator(StreamCreator):
while len(data) > 0: while len(data) > 0:
if self.current_blob is None: if self.current_blob is None:
next_blob_creator = self.blob_manager.get_blob_creator() self.next_blob_creator = self.blob_manager.get_blob_creator()
self.blob_count += 1 self.blob_count += 1
iv = self.iv_generator.next() iv = self.iv_generator.next()
self.current_blob = self._get_blob_maker(iv, next_blob_creator) self.current_blob = self._get_blob_maker(iv, self.next_blob_creator)
done, num_bytes_written = self.current_blob.write(data) done, num_bytes_written = self.current_blob.write(data)
data = data[num_bytes_written:] data = data[num_bytes_written:]
if done is True: if done is True:
close_blob(self.current_blob) should_announce = self.blob_count == 0
d = self.current_blob.close()
d.addCallback(self._blob_finished)
d.addCallback(lambda _: self.blob_manager.creator_finished(
self.next_blob_creator, should_announce))
self.finished_deferreds.append(d)
self.current_blob = None self.current_blob = None
def _get_blob_maker(self, iv, blob_creator): def _get_blob_maker(self, iv, blob_creator):

View file

@ -197,7 +197,7 @@ class Daemon(AuthJSONRPCServer):
self.connected_to_internet = True self.connected_to_internet = True
self.connection_status_code = None self.connection_status_code = None
self.platform = None self.platform = None
self.current_db_revision = 3 self.current_db_revision = 4
self.db_revision_file = conf.settings.get_db_revision_filename() self.db_revision_file = conf.settings.get_db_revision_filename()
self.session = None self.session = None
self.uploaded_temp_files = [] self.uploaded_temp_files = []

View file

@ -10,6 +10,9 @@ def migrate_db(db_dir, start, end):
elif current == 2: elif current == 2:
from lbrynet.db_migrator.migrate2to3 import do_migration from lbrynet.db_migrator.migrate2to3 import do_migration
do_migration(db_dir) do_migration(db_dir)
elif current == 3:
from lbrynet.db_migrator.migrate3to4 import do_migration
do_migration(db_dir)
else: else:
raise Exception( raise Exception(
"DB migration of version {} to {} is not available".format(current, current+1)) "DB migration of version {} to {} is not available".format(current, current+1))

View file

@ -0,0 +1,85 @@
import sqlite3
import os
import logging
log = logging.getLogger(__name__)
def do_migration(db_dir):
log.info("Doing the migration")
migrate_blobs_db(db_dir)
log.info("Migration succeeded")
def migrate_blobs_db(db_dir):
"""
We migrate the blobs.db used in BlobManager to have a "should_announce" column,
and set this to True for blobs that are sd_hash's or head blobs (first blob in stream)
"""
blobs_db = os.path.join(db_dir, "blobs.db")
lbryfile_info_db = os.path.join(db_dir, 'lbryfile_info.db')
# skip migration on fresh installs
if not os.path.isfile(blobs_db) and not os.path.isfile(lbryfile_info_db):
return
# if blobs.db doesn't exist, skip migration
if not os.path.isfile(blobs_db):
log.error("blobs.db was not found but lbryfile_info.db was found, skipping migration")
return
blobs_db_file = sqlite3.connect(blobs_db)
blobs_db_cursor = blobs_db_file.cursor()
# check if new columns exist (it shouldn't) and create it
try:
blobs_db_cursor.execute("SELECT should_announce FROM blobs")
except sqlite3.OperationalError:
blobs_db_cursor.execute(
"ALTER TABLE blobs ADD COLUMN should_announce integer NOT NULL DEFAULT 0")
else:
log.warn("should_announce already exists somehow, proceeding anyways")
# if lbryfile_info.db doesn't exist, skip marking blobs as should_announce = True
if not os.path.isfile(lbryfile_info_db):
log.error("lbryfile_info.db was not found, skipping check for should_announce")
return
lbryfile_info_file = sqlite3.connect(lbryfile_info_db)
lbryfile_info_cursor = lbryfile_info_file.cursor()
# find blobs that are stream descriptors
lbryfile_info_cursor.execute('SELECT * FROM lbry_file_descriptors')
descriptors = lbryfile_info_cursor.fetchall()
should_announce_blob_hashes = []
for d in descriptors:
sd_blob_hash = (d[0],)
should_announce_blob_hashes.append(sd_blob_hash)
# find blobs that are the first blob in a stream
lbryfile_info_cursor.execute('SELECT * FROM lbry_file_blobs WHERE position = 0')
blobs = lbryfile_info_cursor.fetchall()
head_blob_hashes = []
for b in blobs:
blob_hash = (b[0],)
should_announce_blob_hashes.append(blob_hash)
# now mark them as should_announce = True
blobs_db_cursor.executemany('UPDATE blobs SET should_announce=1 WHERE blob_hash=?',
should_announce_blob_hashes)
# Now run some final checks here to make sure migration succeeded
try:
blobs_db_cursor.execute("SELECT should_announce FROM blobs")
except sqlite3.OperationalError:
raise Exception('Migration failed, cannot find should_announce')
blobs_db_cursor.execute("SELECT * FROM blobs WHERE should_announce=1")
blobs = blobs_db_cursor.fetchall()
if len(blobs) != len(should_announce_blob_hashes):
log.error("Some how not all blobs were marked as announceable")
blobs_db_file.commit()

View file

@ -9,6 +9,7 @@ from tests.util import random_lbry_hash
from lbrynet.core.BlobManager import DiskBlobManager from lbrynet.core.BlobManager import DiskBlobManager
from lbrynet.core.HashAnnouncer import DummyHashAnnouncer from lbrynet.core.HashAnnouncer import DummyHashAnnouncer
from lbrynet.core.Peer import Peer from lbrynet.core.Peer import Peer
from lbrynet import conf
from lbrynet.core.cryptoutils import get_lbry_hash_obj from lbrynet.core.cryptoutils import get_lbry_hash_obj
from twisted.trial import unittest from twisted.trial import unittest
@ -16,6 +17,7 @@ from twisted.internet import defer
class BlobManagerTest(unittest.TestCase): class BlobManagerTest(unittest.TestCase):
def setUp(self): def setUp(self):
conf.initialize_settings()
self.blob_dir = tempfile.mkdtemp() self.blob_dir = tempfile.mkdtemp()
self.db_dir = tempfile.mkdtemp() self.db_dir = tempfile.mkdtemp()
hash_announcer = DummyHashAnnouncer() hash_announcer = DummyHashAnnouncer()

View file

@ -12,7 +12,6 @@ from lbrynet.core import Session
from lbrynet.core.server import DHTHashAnnouncer from lbrynet.core.server import DHTHashAnnouncer
from lbrynet.file_manager import EncryptedFileCreator from lbrynet.file_manager import EncryptedFileCreator
from lbrynet.file_manager import EncryptedFileManager from lbrynet.file_manager import EncryptedFileManager
from tests import mocks from tests import mocks
from tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir from tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir