Merge branch 'improve-re-reflect'

This commit is contained in:
Jack Robison 2018-05-08 14:44:54 -04:00
commit 43e4ad66d2
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
8 changed files with 77 additions and 10 deletions

View file

@ -45,6 +45,8 @@ at anytime.
* support both positional and keyword args for api calls
* `peer_list` to return a list of dictionaries instead of a list of lists, added peer node ids to the results
* download blockchain headers from s3 before starting the wallet when the local height is more than `s3_headers_depth` (a config setting) blocks behind
* track successful reflector uploads in sqlite to minimize how many streams are attempted by auto re-reflect
* increase the default `auto_re_reflect_interval` to a day
### Added
* virtual kademlia network and mock udp transport for dht integration tests

View file

@ -280,10 +280,10 @@ ADJUSTABLE_SETTINGS = {
'peer_port': (int, 3333),
'pointtrader_server': (str, 'http://127.0.0.1:2424'),
'reflector_port': (int, 5566),
# if reflect_uploads is True, send files to reflector (after publishing as well as a
# periodic check in the event the initial upload failed or was disconnected part way through
# if reflect_uploads is True, send files to reflector after publishing (as well as a periodic check in the
# event the initial upload failed or was disconnected part way through, provided the auto_re_reflect_interval > 0)
'reflect_uploads': (bool, True),
'auto_re_reflect_interval': (int, 3600),
'auto_re_reflect_interval': (int, 86400), # set to 0 to disable
'reflector_servers': (list, [('reflector2.lbry.io', 5566)], server_list),
'run_reflector_server': (bool, False),
'sd_download_timeout': (int, 3),

View file

@ -199,7 +199,7 @@ class Daemon(AuthJSONRPCServer):
self.connected_to_internet = True
self.connection_status_code = None
self.platform = None
self.current_db_revision = 7
self.current_db_revision = 8
self.db_revision_file = conf.settings.get_db_revision_filename()
self.session = None
self._session_id = conf.settings.get_session_id()

View file

@ -16,6 +16,8 @@ def migrate_db(db_dir, start, end):
from lbrynet.database.migrator.migrate5to6 import do_migration
elif current == 6:
from lbrynet.database.migrator.migrate6to7 import do_migration
elif current == 7:
from lbrynet.database.migrator.migrate7to8 import do_migration
else:
raise Exception("DB migration of version {} to {} is not available".format(current,
current+1))

View file

@ -0,0 +1,21 @@
import sqlite3
import os
def do_migration(db_dir):
db_path = os.path.join(db_dir, "lbrynet.sqlite")
connection = sqlite3.connect(db_path)
cursor = connection.cursor()
cursor.executescript(
"""
create table reflected_stream (
sd_hash text not null,
reflector_address text not null,
timestamp integer,
primary key (sd_hash, reflector_address)
);
"""
)
connection.commit()
connection.close()

View file

@ -157,6 +157,13 @@ class SQLiteStorage(object):
amount integer not null,
address text not null
);
create table if not exists reflected_stream (
sd_hash text not null,
reflector_address text not null,
timestamp integer,
primary key (sd_hash, reflector_address)
);
"""
def __init__(self, db_dir, reactor=None):
@ -765,3 +772,24 @@ class SQLiteStorage(object):
(height, outpoint)
)
return self.db.runInteraction(_save_claim_heights)
# # # # # # # # # reflector functions # # # # # # # # #
def update_reflected_stream(self, sd_hash, reflector_address, success=True):
if success:
return self.db.runOperation(
"insert or replace into reflected_stream values (?, ?, ?)",
(sd_hash, reflector_address, self.clock.seconds())
)
return self.db.runOperation(
"delete from reflected_stream where sd_hash=? and reflector_address=?",
(sd_hash, reflector_address)
)
def get_streams_to_re_reflect(self):
return self.run_and_return_list(
"select s.sd_hash from stream s "
"left outer join reflected_stream r on s.sd_hash=r.sd_hash "
"where r.timestamp is null or r.timestamp < ?",
self.clock.seconds() - conf.settings['auto_re_reflect_interval']
)

View file

@ -31,7 +31,7 @@ class EncryptedFileManager(object):
def __init__(self, session, sd_identifier):
self.auto_re_reflect = conf.settings['reflect_uploads']
self.auto_re_reflect = conf.settings['reflect_uploads'] and conf.settings['auto_re_reflect_interval'] > 0
self.auto_re_reflect_interval = conf.settings['auto_re_reflect_interval']
self.session = session
self.storage = session.storage
@ -140,7 +140,7 @@ class EncryptedFileManager(object):
log.info("Started %i lbry files", len(self.lbry_files))
if self.auto_re_reflect is True:
safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval)
safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval / 10)
@defer.inlineCallbacks
def _stop_lbry_file(self, lbry_file):
@ -253,7 +253,9 @@ class EncryptedFileManager(object):
def reflect_lbry_files(self):
sem = defer.DeferredSemaphore(self.CONCURRENT_REFLECTS)
ds = []
sd_hashes_to_reflect = yield self.storage.get_streams_to_re_reflect()
for lbry_file in self.lbry_files:
if lbry_file.sd_hash in sd_hashes_to_reflect:
ds.append(sem.run(reflect_file, lbry_file))
yield defer.DeferredList(ds)

View file

@ -55,6 +55,16 @@ class EncryptedFileReflectorClient(Protocol):
d.addCallback(lambda _: self.send_next_request())
d.addErrback(self.response_failure_handler)
def store_result(self, result):
if not self.needed_blobs or len(self.reflected_blobs) == len(self.needed_blobs):
reflected = True
else:
reflected = False
d = self.blob_manager.storage.update_reflected_stream(self.sd_hash, self.transport.getPeer().host, reflected)
d.addCallback(lambda _: result)
return d
def connectionLost(self, reason):
# make sure blob file readers get closed
self.set_not_uploading()
@ -68,15 +78,17 @@ class EncryptedFileReflectorClient(Protocol):
else:
log.info('Finished sending reflector %i blobs for %s',
len(self.reflected_blobs), self.stream_descriptor)
self.factory.finished_deferred.callback(self.reflected_blobs)
result = self.reflected_blobs
elif reason.check(error.ConnectionLost):
log.warning("Stopped reflecting %s after sending %i blobs",
self.stream_descriptor, len(self.reflected_blobs))
self.factory.finished_deferred.callback(self.reflected_blobs)
result = self.reflected_blobs
else:
log.info('Reflector finished for %s: %s', self.stream_descriptor,
reason)
self.factory.finished_deferred.callback(reason)
result = reason
self.factory.finished_deferred.addCallback(self.store_result)
self.factory.finished_deferred.callback(result)
# IConsumer stuff