diff --git a/CHANGELOG.md b/CHANGELOG.md index b232d1b5f..eacd1d4dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,8 @@ at anytime. * support both positional and keyword args for api calls * `peer_list` to return a list of dictionaries instead of a list of lists, added peer node ids to the results * download blockchain headers from s3 before starting the wallet when the local height is more than `s3_headers_depth` (a config setting) blocks behind + * track successful reflector uploads in sqlite to minimize how many streams are attempted by auto re-reflect + * increase the default `auto_re_reflect_interval` to a day ### Added * virtual kademlia network and mock udp transport for dht integration tests diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 0be7a423e..1577794d9 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -280,10 +280,10 @@ ADJUSTABLE_SETTINGS = { 'peer_port': (int, 3333), 'pointtrader_server': (str, 'http://127.0.0.1:2424'), 'reflector_port': (int, 5566), - # if reflect_uploads is True, send files to reflector (after publishing as well as a - # periodic check in the event the initial upload failed or was disconnected part way through + # if reflect_uploads is True, send files to reflector after publishing (as well as a periodic check in the + # event the initial upload failed or was disconnected part way through, provided the auto_re_reflect_interval > 0) 'reflect_uploads': (bool, True), - 'auto_re_reflect_interval': (int, 3600), + 'auto_re_reflect_interval': (int, 86400), # set to 0 to disable 'reflector_servers': (list, [('reflector2.lbry.io', 5566)], server_list), 'run_reflector_server': (bool, False), 'sd_download_timeout': (int, 3), diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index df39a6137..43c67e0d7 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -199,7 +199,7 @@ class Daemon(AuthJSONRPCServer): self.connected_to_internet = True self.connection_status_code = None self.platform = None - self.current_db_revision = 7 + self.current_db_revision = 8 self.db_revision_file = conf.settings.get_db_revision_filename() self.session = None self._session_id = conf.settings.get_session_id() diff --git a/lbrynet/database/migrator/dbmigrator.py b/lbrynet/database/migrator/dbmigrator.py index a4057db38..ab1519380 100644 --- a/lbrynet/database/migrator/dbmigrator.py +++ b/lbrynet/database/migrator/dbmigrator.py @@ -16,6 +16,8 @@ def migrate_db(db_dir, start, end): from lbrynet.database.migrator.migrate5to6 import do_migration elif current == 6: from lbrynet.database.migrator.migrate6to7 import do_migration + elif current == 7: + from lbrynet.database.migrator.migrate7to8 import do_migration else: raise Exception("DB migration of version {} to {} is not available".format(current, current+1)) diff --git a/lbrynet/database/migrator/migrate7to8.py b/lbrynet/database/migrator/migrate7to8.py new file mode 100644 index 000000000..d048224e9 --- /dev/null +++ b/lbrynet/database/migrator/migrate7to8.py @@ -0,0 +1,21 @@ +import sqlite3 +import os + + +def do_migration(db_dir): + db_path = os.path.join(db_dir, "lbrynet.sqlite") + connection = sqlite3.connect(db_path) + cursor = connection.cursor() + + cursor.executescript( + """ + create table reflected_stream ( + sd_hash text not null, + reflector_address text not null, + timestamp integer, + primary key (sd_hash, reflector_address) + ); + """ + ) + connection.commit() + connection.close() diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py index e3bdd649c..122f7a866 100644 --- a/lbrynet/database/storage.py +++ b/lbrynet/database/storage.py @@ -157,6 +157,13 @@ class SQLiteStorage(object): amount integer not null, address text not null ); + + create table if not exists reflected_stream ( + sd_hash text not null, + reflector_address text not null, + timestamp integer, + primary key (sd_hash, reflector_address) + ); """ def __init__(self, db_dir, reactor=None): @@ -765,3 +772,24 @@ class SQLiteStorage(object): (height, outpoint) ) return self.db.runInteraction(_save_claim_heights) + + # # # # # # # # # reflector functions # # # # # # # # # + + def update_reflected_stream(self, sd_hash, reflector_address, success=True): + if success: + return self.db.runOperation( + "insert or replace into reflected_stream values (?, ?, ?)", + (sd_hash, reflector_address, self.clock.seconds()) + ) + return self.db.runOperation( + "delete from reflected_stream where sd_hash=? and reflector_address=?", + (sd_hash, reflector_address) + ) + + def get_streams_to_re_reflect(self): + return self.run_and_return_list( + "select s.sd_hash from stream s " + "left outer join reflected_stream r on s.sd_hash=r.sd_hash " + "where r.timestamp is null or r.timestamp < ?", + self.clock.seconds() - conf.settings['auto_re_reflect_interval'] + ) diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index 5f91eae01..0fffd6e00 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -31,7 +31,7 @@ class EncryptedFileManager(object): def __init__(self, session, sd_identifier): - self.auto_re_reflect = conf.settings['reflect_uploads'] + self.auto_re_reflect = conf.settings['reflect_uploads'] and conf.settings['auto_re_reflect_interval'] > 0 self.auto_re_reflect_interval = conf.settings['auto_re_reflect_interval'] self.session = session self.storage = session.storage @@ -140,7 +140,7 @@ class EncryptedFileManager(object): log.info("Started %i lbry files", len(self.lbry_files)) if self.auto_re_reflect is True: - safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval) + safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval / 10) @defer.inlineCallbacks def _stop_lbry_file(self, lbry_file): @@ -253,8 +253,10 @@ class EncryptedFileManager(object): def reflect_lbry_files(self): sem = defer.DeferredSemaphore(self.CONCURRENT_REFLECTS) ds = [] + sd_hashes_to_reflect = yield self.storage.get_streams_to_re_reflect() for lbry_file in self.lbry_files: - ds.append(sem.run(reflect_file, lbry_file)) + if lbry_file.sd_hash in sd_hashes_to_reflect: + ds.append(sem.run(reflect_file, lbry_file)) yield defer.DeferredList(ds) @defer.inlineCallbacks diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index 329eeb5e0..09c4694c4 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -55,6 +55,16 @@ class EncryptedFileReflectorClient(Protocol): d.addCallback(lambda _: self.send_next_request()) d.addErrback(self.response_failure_handler) + def store_result(self, result): + if not self.needed_blobs or len(self.reflected_blobs) == len(self.needed_blobs): + reflected = True + else: + reflected = False + + d = self.blob_manager.storage.update_reflected_stream(self.sd_hash, self.transport.getPeer().host, reflected) + d.addCallback(lambda _: result) + return d + def connectionLost(self, reason): # make sure blob file readers get closed self.set_not_uploading() @@ -68,15 +78,17 @@ class EncryptedFileReflectorClient(Protocol): else: log.info('Finished sending reflector %i blobs for %s', len(self.reflected_blobs), self.stream_descriptor) - self.factory.finished_deferred.callback(self.reflected_blobs) + result = self.reflected_blobs elif reason.check(error.ConnectionLost): log.warning("Stopped reflecting %s after sending %i blobs", self.stream_descriptor, len(self.reflected_blobs)) - self.factory.finished_deferred.callback(self.reflected_blobs) + result = self.reflected_blobs else: log.info('Reflector finished for %s: %s', self.stream_descriptor, reason) - self.factory.finished_deferred.callback(reason) + result = reason + self.factory.finished_deferred.addCallback(self.store_result) + self.factory.finished_deferred.callback(result) # IConsumer stuff