diff --git a/CHANGELOG.md b/CHANGELOG.md index 46e257ae8..bcc8008b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,8 @@ at anytime. * changed txrequests for treq * changed cryptography version to 2.2.2 * removed pycrypto dependency, replacing all calls to cryptography + * full verification of streams only during migration instead of every startup + * database batching functions for starting up the file manager * several internal dht functions to use inlineCallbacks * `DHTHashAnnouncer` and `Node` manage functions to use `LoopingCall`s instead of scheduling with `callLater`. * `store` kademlia rpc method to block on the call finishing and to return storing peer information diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 77f4790d2..0304e458a 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -204,7 +204,7 @@ class Daemon(AuthJSONRPCServer): self.connected_to_internet = True self.connection_status_code = None self.platform = None - self.current_db_revision = 8 + self.current_db_revision = 9 self.db_revision_file = conf.settings.get_db_revision_filename() self.session = None self._session_id = conf.settings.get_session_id() diff --git a/lbrynet/database/migrator/dbmigrator.py b/lbrynet/database/migrator/dbmigrator.py index ab1519380..196263f0a 100644 --- a/lbrynet/database/migrator/dbmigrator.py +++ b/lbrynet/database/migrator/dbmigrator.py @@ -18,6 +18,8 @@ def migrate_db(db_dir, start, end): from lbrynet.database.migrator.migrate6to7 import do_migration elif current == 7: from lbrynet.database.migrator.migrate7to8 import do_migration + elif current == 8: + from lbrynet.database.migrator.migrate8to9 import do_migration else: raise Exception("DB migration of version {} to {} is not available".format(current, current+1)) diff --git a/lbrynet/database/migrator/migrate8to9.py b/lbrynet/database/migrator/migrate8to9.py new file mode 100644 index 000000000..a518e9899 --- /dev/null +++ b/lbrynet/database/migrator/migrate8to9.py @@ -0,0 +1,54 @@ +import sqlite3 +import logging +import os + +from lbrynet.core.Error import InvalidStreamDescriptorError +from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, format_sd_info, format_blobs, validate_descriptor +from lbrynet.cryptstream.CryptBlob import CryptBlobInfo + +log = logging.getLogger(__name__) + + +def do_migration(db_dir): + db_path = os.path.join(db_dir, "lbrynet.sqlite") + blob_dir = os.path.join(db_dir, "blobfiles") + connection = sqlite3.connect(db_path) + cursor = connection.cursor() + + query = "select stream_name, stream_key, suggested_filename, sd_hash, stream_hash from stream" + streams = cursor.execute(query).fetchall() + + blobs = cursor.execute("select s.stream_hash, s.position, s.iv, b.blob_hash, b.blob_length from stream_blob s " + "left outer join blob b ON b.blob_hash=s.blob_hash order by s.position").fetchall() + blobs_by_stream = {} + for stream_hash, position, iv, blob_hash, blob_length in blobs: + blobs_by_stream.setdefault(stream_hash, []).append(CryptBlobInfo(blob_hash, position, blob_length or 0, iv)) + + for stream_name, stream_key, suggested_filename, sd_hash, stream_hash in streams: + sd_info = format_sd_info( + EncryptedFileStreamType, stream_name, stream_key, + suggested_filename, stream_hash, format_blobs(blobs_by_stream[stream_hash]) + ) + try: + validate_descriptor(sd_info) + except InvalidStreamDescriptorError as err: + log.warning("Stream for descriptor %s is invalid (%s), cleaning it up", + sd_hash, err.message) + blob_hashes = [blob.blob_hash for blob in blobs_by_stream[stream_hash]] + delete_stream(cursor, stream_hash, sd_hash, blob_hashes, blob_dir) + + connection.commit() + connection.close() + + +def delete_stream(transaction, stream_hash, sd_hash, blob_hashes, blob_dir): + transaction.execute("delete from content_claim where stream_hash=? ", (stream_hash,)) + transaction.execute("delete from file where stream_hash=? ", (stream_hash, )) + transaction.execute("delete from stream_blob where stream_hash=?", (stream_hash, )) + transaction.execute("delete from stream where stream_hash=? ", (stream_hash, )) + transaction.execute("delete from blob where blob_hash=?", (sd_hash, )) + for blob_hash in blob_hashes: + transaction.execute("delete from blob where blob_hash=?", (blob_hash, )) + file_path = os.path.join(blob_dir, blob_hash) + if os.path.isfile(file_path): + os.unlink(file_path) diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py index 122f7a866..aae8f180a 100644 --- a/lbrynet/database/storage.py +++ b/lbrynet/database/storage.py @@ -552,7 +552,7 @@ class SQLiteStorage(object): ) return self.db.runInteraction(_save_support) - def get_supports(self, claim_id): + def get_supports(self, *claim_ids): def _format_support(outpoint, supported_id, amount, address): return { "txid": outpoint.split(":")[0], @@ -563,10 +563,15 @@ class SQLiteStorage(object): } def _get_supports(transaction): + if len(claim_ids) == 1: + bind = "=?" + else: + bind = "in ({})".format(','.join('?' for _ in range(len(claim_ids)))) return [ _format_support(*support_info) for support_info in transaction.execute( - "select * from support where claim_id=?", (claim_id, ) + "select * from support where claim_id {}".format(bind), + tuple(claim_ids) ).fetchall() ] @@ -683,51 +688,82 @@ class SQLiteStorage(object): @defer.inlineCallbacks def get_content_claim(self, stream_hash, include_supports=True): - def _get_content_claim(transaction): - claim_id = transaction.execute( - "select claim.claim_outpoint from content_claim " - "inner join claim on claim.claim_outpoint=content_claim.claim_outpoint and content_claim.stream_hash=? " - "order by claim.rowid desc", (stream_hash, ) + def _get_claim_from_stream_hash(transaction): + claim_info = transaction.execute( + "select c.*, " + "case when c.channel_claim_id is not null then " + "(select claim_name from claim where claim_id==c.channel_claim_id) " + "else null end as channel_name from content_claim " + "inner join claim c on c.claim_outpoint=content_claim.claim_outpoint " + "and content_claim.stream_hash=? order by c.rowid desc", (stream_hash,) ).fetchone() - if not claim_id: + if not claim_info: return None - return claim_id[0] + channel_name = claim_info[-1] + result = _format_claim_response(*claim_info[:-1]) + if channel_name: + result['channel_name'] = channel_name + return result - content_claim_outpoint = yield self.db.runInteraction(_get_content_claim) - result = None - if content_claim_outpoint: - result = yield self.get_claim(content_claim_outpoint, include_supports) + result = yield self.db.runInteraction(_get_claim_from_stream_hash) + if result and include_supports: + supports = yield self.get_supports(result['claim_id']) + result['supports'] = supports + result['effective_amount'] = float( + sum([support['amount'] for support in supports]) + result['amount'] + ) defer.returnValue(result) @defer.inlineCallbacks - def get_claim(self, claim_outpoint, include_supports=True): - def _claim_response(outpoint, claim_id, name, amount, height, serialized, channel_id, address, claim_sequence): - r = { - "name": name, - "claim_id": claim_id, - "address": address, - "claim_sequence": claim_sequence, - "value": ClaimDict.deserialize(serialized.decode('hex')).claim_dict, - "height": height, - "amount": float(Decimal(amount) / Decimal(COIN)), - "nout": int(outpoint.split(":")[1]), - "txid": outpoint.split(":")[0], - "channel_claim_id": channel_id, - "channel_name": None - } - return r + def get_claims_from_stream_hashes(self, stream_hashes, include_supports=True): + def _batch_get_claim(transaction): + results = {} + bind = "({})".format(','.join('?' for _ in range(len(stream_hashes)))) + claim_infos = transaction.execute( + "select content_claim.stream_hash, c.*, " + "case when c.channel_claim_id is not null then " + "(select claim_name from claim where claim_id==c.channel_claim_id) " + "else null end as channel_name from content_claim " + "inner join claim c on c.claim_outpoint=content_claim.claim_outpoint " + "and content_claim.stream_hash in {} order by c.rowid desc".format(bind), + tuple(stream_hashes) + ).fetchall() + for claim_info in claim_infos: + channel_name = claim_info[-1] + stream_hash = claim_info[0] + result = _format_claim_response(*claim_info[1:-1]) + if channel_name: + result['channel_name'] = channel_name + results[stream_hash] = result + return results + claims = yield self.db.runInteraction(_batch_get_claim) + if include_supports: + all_supports = {} + for support in (yield self.get_supports(*[claim['claim_id'] for claim in claims.values()])): + all_supports.setdefault(support['claim_id'], []).append(support) + for stream_hash in claims.keys(): + claim = claims[stream_hash] + supports = all_supports.get(claim['claim_id'], []) + claim['supports'] = supports + claim['effective_amount'] = float( + sum([support['amount'] for support in supports]) + claim['amount'] + ) + claims[stream_hash] = claim + defer.returnValue(claims) + + @defer.inlineCallbacks + def get_claim(self, claim_outpoint, include_supports=True): def _get_claim(transaction): - claim_info = transaction.execute( - "select * from claim where claim_outpoint=?", (claim_outpoint, ) - ).fetchone() - result = _claim_response(*claim_info) - if result['channel_claim_id']: - channel_name_result = transaction.execute( - "select claim_name from claim where claim_id=?", (result['channel_claim_id'], ) - ).fetchone() - if channel_name_result: - result['channel_name'] = channel_name_result[0] + claim_info = transaction.execute("select c.*, " + "case when c.channel_claim_id is not null then " + "(select claim_name from claim where claim_id==c.channel_claim_id) " + "else null end as channel_name from claim c where claim_outpoint = ?", + (claim_outpoint,)).fetchone() + channel_name = claim_info[-1] + result = _format_claim_response(*claim_info[:-1]) + if channel_name: + result['channel_name'] = channel_name return result result = yield self.db.runInteraction(_get_claim) @@ -793,3 +829,21 @@ class SQLiteStorage(object): "where r.timestamp is null or r.timestamp < ?", self.clock.seconds() - conf.settings['auto_re_reflect_interval'] ) + + +# Helper functions +def _format_claim_response(outpoint, claim_id, name, amount, height, serialized, channel_id, address, claim_sequence): + r = { + "name": name, + "claim_id": claim_id, + "address": address, + "claim_sequence": claim_sequence, + "value": ClaimDict.deserialize(serialized.decode('hex')).claim_dict, + "height": height, + "amount": float(Decimal(amount) / Decimal(COIN)), + "nout": int(outpoint.split(":")[1]), + "txid": outpoint.split(":")[0], + "channel_claim_id": channel_id, + "channel_name": None + } + return r diff --git a/lbrynet/file_manager/EncryptedFileDownloader.py b/lbrynet/file_manager/EncryptedFileDownloader.py index 2e2a054c1..25abd3e18 100644 --- a/lbrynet/file_manager/EncryptedFileDownloader.py +++ b/lbrynet/file_manager/EncryptedFileDownloader.py @@ -56,18 +56,21 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): self.channel_name = None self.metadata = None + def set_claim_info(self, claim_info): + self.claim_id = claim_info['claim_id'] + self.txid = claim_info['txid'] + self.nout = claim_info['nout'] + self.channel_claim_id = claim_info['channel_claim_id'] + self.outpoint = "%s:%i" % (self.txid, self.nout) + self.claim_name = claim_info['name'] + self.channel_name = claim_info['channel_name'] + self.metadata = claim_info['value']['stream']['metadata'] + @defer.inlineCallbacks def get_claim_info(self, include_supports=True): claim_info = yield self.storage.get_content_claim(self.stream_hash, include_supports) if claim_info: - self.claim_id = claim_info['claim_id'] - self.txid = claim_info['txid'] - self.nout = claim_info['nout'] - self.channel_claim_id = claim_info['channel_claim_id'] - self.outpoint = "%s:%i" % (self.txid, self.nout) - self.claim_name = claim_info['name'] - self.channel_name = claim_info['channel_name'] - self.metadata = claim_info['value']['stream']['metadata'] + self.set_claim_info(claim_info) defer.returnValue(claim_info) diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index 0fffd6e00..02245c39c 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -6,12 +6,11 @@ import logging from twisted.internet import defer, task, reactor from twisted.python.failure import Failure -from lbrynet.core.Error import InvalidStreamDescriptorError from lbrynet.reflector.reupload import reflect_file from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory -from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info, validate_descriptor +from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call @@ -96,47 +95,35 @@ class EncryptedFileManager(object): suggested_file_name=suggested_file_name ) - @defer.inlineCallbacks - def _start_lbry_file(self, file_info, payment_rate_manager): + def _start_lbry_file(self, file_info, payment_rate_manager, claim_info): lbry_file = self._get_lbry_file( file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'], file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'], file_info['suggested_file_name'] ) - yield lbry_file.get_claim_info() + if claim_info: + lbry_file.set_claim_info(claim_info) try: - # verify the stream is valid (we might have downloaded an invalid stream - # in the past when the validation check didn't work) - stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True) - validate_descriptor(stream_info) - except InvalidStreamDescriptorError as err: - log.warning("Stream for descriptor %s is invalid (%s), cleaning it up", - lbry_file.sd_hash, err.message) - yield lbry_file.delete_data() - yield self.session.storage.delete_stream(lbry_file.stream_hash) - else: - try: - # restore will raise an Exception if status is unknown - lbry_file.restore(file_info['status']) - self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info - self.lbry_files.append(lbry_file) - if len(self.lbry_files) % 500 == 0: - log.info("Started %i files", len(self.lbry_files)) - except Exception: - log.warning("Failed to start %i", file_info.get('rowid')) + # restore will raise an Exception if status is unknown + lbry_file.restore(file_info['status']) + self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info + self.lbry_files.append(lbry_file) + if len(self.lbry_files) % 500 == 0: + log.info("Started %i files", len(self.lbry_files)) + except Exception: + log.warning("Failed to start %i", file_info.get('rowid')) @defer.inlineCallbacks def _start_lbry_files(self): files = yield self.session.storage.get_all_lbry_files() + claim_infos = yield self.session.storage.get_claims_from_stream_hashes([file['stream_hash'] for file in files]) b_prm = self.session.base_payment_rate_manager payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker) log.info("Starting %i files", len(files)) - dl = [] for file_info in files: - dl.append(self._start_lbry_file(file_info, payment_rate_manager)) - - yield defer.DeferredList(dl) + claim_info = claim_infos.get(file_info['stream_hash']) + self._start_lbry_file(file_info, payment_rate_manager, claim_info) log.info("Started %i lbry files", len(self.lbry_files)) if self.auto_re_reflect is True: diff --git a/lbrynet/tests/unit/database/test_SQLiteStorage.py b/lbrynet/tests/unit/database/test_SQLiteStorage.py index 5bfe72988..5df80ee2e 100644 --- a/lbrynet/tests/unit/database/test_SQLiteStorage.py +++ b/lbrynet/tests/unit/database/test_SQLiteStorage.py @@ -163,6 +163,25 @@ class BlobStorageTests(StorageTest): self.assertEqual(blob_hashes, []) +class SupportsStorageTests(StorageTest): + @defer.inlineCallbacks + def test_supports_storage(self): + claim_ids = [random_lbry_hash() for _ in range(10)] + random_supports = [{"txid": random_lbry_hash(), "nout":i, "address": "addr{}".format(i), "amount": i} + for i in range(20)] + expected_supports = {} + for idx, claim_id in enumerate(claim_ids): + yield self.storage.save_supports(claim_id, random_supports[idx*2:idx*2+2]) + for random_support in random_supports[idx*2:idx*2+2]: + random_support['claim_id'] = claim_id + expected_supports.setdefault(claim_id, []).append(random_support) + supports = yield self.storage.get_supports(claim_ids[0]) + self.assertEqual(supports, expected_supports[claim_ids[0]]) + all_supports = yield self.storage.get_supports(*claim_ids) + for support in all_supports: + self.assertIn(support, expected_supports[support['claim_id']]) + + class StreamStorageTests(StorageTest): @defer.inlineCallbacks def test_store_stream(self, stream_hash=None):