diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 0eaa24285..3e5b407ee 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -199,7 +199,7 @@ class Daemon(AuthJSONRPCServer): self.connected_to_internet = True self.connection_status_code = None self.platform = None - self.current_db_revision = 8 + self.current_db_revision = 9 self.db_revision_file = conf.settings.get_db_revision_filename() self.session = None self._session_id = conf.settings.get_session_id() @@ -244,7 +244,7 @@ class Daemon(AuthJSONRPCServer): yield self._start_analytics() yield add_lbry_file_to_sd_identifier(self.sd_identifier) yield self._setup_stream_identifier() - yield self._setup_lbry_file_manager(verify_streams=migrated) + yield self._setup_lbry_file_manager() yield self._setup_query_handlers() yield self._setup_server() log.info("Starting balance: " + str(self.session.wallet.get_balance())) @@ -512,11 +512,11 @@ class Daemon(AuthJSONRPCServer): defer.returnValue(migrated) @defer.inlineCallbacks - def _setup_lbry_file_manager(self, verify_streams): + def _setup_lbry_file_manager(self): log.info('Starting the file manager') self.startup_status = STARTUP_STAGES[3] self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier) - yield self.lbry_file_manager.setup(verify_streams) + yield self.lbry_file_manager.setup() log.info('Done setting up file manager') def _start_analytics(self): diff --git a/lbrynet/database/migrator/dbmigrator.py b/lbrynet/database/migrator/dbmigrator.py index ab1519380..196263f0a 100644 --- a/lbrynet/database/migrator/dbmigrator.py +++ b/lbrynet/database/migrator/dbmigrator.py @@ -18,6 +18,8 @@ def migrate_db(db_dir, start, end): from lbrynet.database.migrator.migrate6to7 import do_migration elif current == 7: from lbrynet.database.migrator.migrate7to8 import do_migration + elif current == 8: + from lbrynet.database.migrator.migrate8to9 import do_migration else: raise Exception("DB migration of version {} to {} is not available".format(current, current+1)) diff --git a/lbrynet/database/migrator/migrate8to9.py b/lbrynet/database/migrator/migrate8to9.py new file mode 100644 index 000000000..a518e9899 --- /dev/null +++ b/lbrynet/database/migrator/migrate8to9.py @@ -0,0 +1,54 @@ +import sqlite3 +import logging +import os + +from lbrynet.core.Error import InvalidStreamDescriptorError +from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, format_sd_info, format_blobs, validate_descriptor +from lbrynet.cryptstream.CryptBlob import CryptBlobInfo + +log = logging.getLogger(__name__) + + +def do_migration(db_dir): + db_path = os.path.join(db_dir, "lbrynet.sqlite") + blob_dir = os.path.join(db_dir, "blobfiles") + connection = sqlite3.connect(db_path) + cursor = connection.cursor() + + query = "select stream_name, stream_key, suggested_filename, sd_hash, stream_hash from stream" + streams = cursor.execute(query).fetchall() + + blobs = cursor.execute("select s.stream_hash, s.position, s.iv, b.blob_hash, b.blob_length from stream_blob s " + "left outer join blob b ON b.blob_hash=s.blob_hash order by s.position").fetchall() + blobs_by_stream = {} + for stream_hash, position, iv, blob_hash, blob_length in blobs: + blobs_by_stream.setdefault(stream_hash, []).append(CryptBlobInfo(blob_hash, position, blob_length or 0, iv)) + + for stream_name, stream_key, suggested_filename, sd_hash, stream_hash in streams: + sd_info = format_sd_info( + EncryptedFileStreamType, stream_name, stream_key, + suggested_filename, stream_hash, format_blobs(blobs_by_stream[stream_hash]) + ) + try: + validate_descriptor(sd_info) + except InvalidStreamDescriptorError as err: + log.warning("Stream for descriptor %s is invalid (%s), cleaning it up", + sd_hash, err.message) + blob_hashes = [blob.blob_hash for blob in blobs_by_stream[stream_hash]] + delete_stream(cursor, stream_hash, sd_hash, blob_hashes, blob_dir) + + connection.commit() + connection.close() + + +def delete_stream(transaction, stream_hash, sd_hash, blob_hashes, blob_dir): + transaction.execute("delete from content_claim where stream_hash=? ", (stream_hash,)) + transaction.execute("delete from file where stream_hash=? ", (stream_hash, )) + transaction.execute("delete from stream_blob where stream_hash=?", (stream_hash, )) + transaction.execute("delete from stream where stream_hash=? ", (stream_hash, )) + transaction.execute("delete from blob where blob_hash=?", (sd_hash, )) + for blob_hash in blob_hashes: + transaction.execute("delete from blob where blob_hash=?", (blob_hash, )) + file_path = os.path.join(blob_dir, blob_hash) + if os.path.isfile(file_path): + os.unlink(file_path) diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index d28006dbd..02245c39c 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -6,12 +6,11 @@ import logging from twisted.internet import defer, task, reactor from twisted.python.failure import Failure -from lbrynet.core.Error import InvalidStreamDescriptorError from lbrynet.reflector.reupload import reflect_file from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory -from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info, validate_descriptor +from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call @@ -42,9 +41,9 @@ class EncryptedFileManager(object): self.lbry_file_reflector = task.LoopingCall(self.reflect_lbry_files) @defer.inlineCallbacks - def setup(self, verify_streams=False): + def setup(self): yield self._add_to_sd_identifier() - yield self._start_lbry_files(verify_streams) + yield self._start_lbry_files() log.info("Started file manager") def get_lbry_file_status(self, lbry_file): @@ -96,8 +95,7 @@ class EncryptedFileManager(object): suggested_file_name=suggested_file_name ) - @defer.inlineCallbacks - def _start_lbry_file(self, file_info, payment_rate_manager, verify_stream, claim_info): + def _start_lbry_file(self, file_info, payment_rate_manager, claim_info): lbry_file = self._get_lbry_file( file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'], file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'], @@ -106,42 +104,26 @@ class EncryptedFileManager(object): if claim_info: lbry_file.set_claim_info(claim_info) try: - # verify if the stream is valid (we might have downloaded an invalid stream - # in the past when the validation check didn't work. This runs after every - # migration to ensure blobs migrated from that past version gets verified) - if verify_stream: - stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True) - validate_descriptor(stream_info) - except InvalidStreamDescriptorError as err: - log.warning("Stream for descriptor %s is invalid (%s), cleaning it up", - lbry_file.sd_hash, err.message) - yield lbry_file.delete_data() - yield self.session.storage.delete_stream(lbry_file.stream_hash) - else: - try: - # restore will raise an Exception if status is unknown - lbry_file.restore(file_info['status']) - self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info - self.lbry_files.append(lbry_file) - if len(self.lbry_files) % 500 == 0: - log.info("Started %i files", len(self.lbry_files)) - except Exception: - log.warning("Failed to start %i", file_info.get('rowid')) + # restore will raise an Exception if status is unknown + lbry_file.restore(file_info['status']) + self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info + self.lbry_files.append(lbry_file) + if len(self.lbry_files) % 500 == 0: + log.info("Started %i files", len(self.lbry_files)) + except Exception: + log.warning("Failed to start %i", file_info.get('rowid')) @defer.inlineCallbacks - def _start_lbry_files(self, verify_streams): + def _start_lbry_files(self): files = yield self.session.storage.get_all_lbry_files() claim_infos = yield self.session.storage.get_claims_from_stream_hashes([file['stream_hash'] for file in files]) b_prm = self.session.base_payment_rate_manager payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker) log.info("Starting %i files", len(files)) - dl = [] for file_info in files: claim_info = claim_infos.get(file_info['stream_hash']) - dl.append(self._start_lbry_file(file_info, payment_rate_manager, verify_streams, claim_info)) - - yield defer.DeferredList(dl) + self._start_lbry_file(file_info, payment_rate_manager, claim_info) log.info("Started %i lbry files", len(self.lbry_files)) if self.auto_re_reflect is True: