From df735252e54a3aeae229e4261d8cb9c45ce145d4 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor1984@riseup.net>
Date: Tue, 8 May 2018 16:49:17 -0300
Subject: [PATCH 1/7] verify streams only after migration

---
 lbrynet/daemon/Daemon.py                     |  6 +++---
 lbrynet/file_manager/EncryptedFileManager.py | 20 +++++++++++---------
 2 files changed, 14 insertions(+), 12 deletions(-)

diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py
index 43c67e0d7..0eaa24285 100644
--- a/lbrynet/daemon/Daemon.py
+++ b/lbrynet/daemon/Daemon.py
@@ -244,7 +244,7 @@ class Daemon(AuthJSONRPCServer):
         yield self._start_analytics()
         yield add_lbry_file_to_sd_identifier(self.sd_identifier)
         yield self._setup_stream_identifier()
-        yield self._setup_lbry_file_manager()
+        yield self._setup_lbry_file_manager(verify_streams=migrated)
         yield self._setup_query_handlers()
         yield self._setup_server()
         log.info("Starting balance: " + str(self.session.wallet.get_balance()))
@@ -512,11 +512,11 @@ class Daemon(AuthJSONRPCServer):
         defer.returnValue(migrated)
 
     @defer.inlineCallbacks
-    def _setup_lbry_file_manager(self):
+    def _setup_lbry_file_manager(self, verify_streams):
         log.info('Starting the file manager')
         self.startup_status = STARTUP_STAGES[3]
         self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier)
-        yield self.lbry_file_manager.setup()
+        yield self.lbry_file_manager.setup(verify_streams)
         log.info('Done setting up file manager')
 
     def _start_analytics(self):
diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py
index 0fffd6e00..73cc3fb12 100644
--- a/lbrynet/file_manager/EncryptedFileManager.py
+++ b/lbrynet/file_manager/EncryptedFileManager.py
@@ -42,9 +42,9 @@ class EncryptedFileManager(object):
         self.lbry_file_reflector = task.LoopingCall(self.reflect_lbry_files)
 
     @defer.inlineCallbacks
-    def setup(self):
+    def setup(self, verify_streams=False):
         yield self._add_to_sd_identifier()
-        yield self._start_lbry_files()
+        yield self._start_lbry_files(verify_streams)
         log.info("Started file manager")
 
     def get_lbry_file_status(self, lbry_file):
@@ -97,7 +97,7 @@ class EncryptedFileManager(object):
         )
 
     @defer.inlineCallbacks
-    def _start_lbry_file(self, file_info, payment_rate_manager):
+    def _start_lbry_file(self, file_info, payment_rate_manager, verify_stream):
         lbry_file = self._get_lbry_file(
             file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'],
             file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'],
@@ -105,10 +105,12 @@ class EncryptedFileManager(object):
         )
         yield lbry_file.get_claim_info()
         try:
-            # verify the stream is valid (we might have downloaded an invalid stream
-            # in the past when the validation check didn't work)
-            stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True)
-            validate_descriptor(stream_info)
+            # verify if the stream is valid (we might have downloaded an invalid stream
+            # in the past when the validation check didn't work. This runs after every
+            # migration to ensure blobs migrated from that past version gets verified)
+            if verify_stream:
+                stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True)
+                validate_descriptor(stream_info)
         except InvalidStreamDescriptorError as err:
             log.warning("Stream for descriptor %s is invalid (%s), cleaning it up",
                         lbry_file.sd_hash, err.message)
@@ -126,7 +128,7 @@ class EncryptedFileManager(object):
                 log.warning("Failed to start %i", file_info.get('rowid'))
 
     @defer.inlineCallbacks
-    def _start_lbry_files(self):
+    def _start_lbry_files(self, verify_streams):
         files = yield self.session.storage.get_all_lbry_files()
         b_prm = self.session.base_payment_rate_manager
         payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker)
@@ -134,7 +136,7 @@ class EncryptedFileManager(object):
         log.info("Starting %i files", len(files))
         dl = []
         for file_info in files:
-            dl.append(self._start_lbry_file(file_info, payment_rate_manager))
+            dl.append(self._start_lbry_file(file_info, payment_rate_manager, verify_streams))
 
         yield defer.DeferredList(dl)
 

From acd330aa2a6cbc072ad41582b5b5bf4bacfa5e52 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor1984@riseup.net>
Date: Tue, 8 May 2018 17:49:49 -0300
Subject: [PATCH 2/7] get claim info and channel name using a single query

---
 lbrynet/database/storage.py | 19 +++++++++----------
 1 file changed, 9 insertions(+), 10 deletions(-)

diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py
index 122f7a866..8c8d29717 100644
--- a/lbrynet/database/storage.py
+++ b/lbrynet/database/storage.py
@@ -718,16 +718,15 @@ class SQLiteStorage(object):
             return r
 
         def _get_claim(transaction):
-            claim_info = transaction.execute(
-                "select * from claim where claim_outpoint=?", (claim_outpoint, )
-            ).fetchone()
-            result = _claim_response(*claim_info)
-            if result['channel_claim_id']:
-                channel_name_result = transaction.execute(
-                    "select claim_name from claim where claim_id=?", (result['channel_claim_id'], )
-                ).fetchone()
-                if channel_name_result:
-                    result['channel_name'] = channel_name_result[0]
+            claim_info = transaction.execute("select c.*, "
+                                             "case when c.channel_claim_id is not null then "
+                                             "(select claim_name from claim where claim_id==c.channel_claim_id) "
+                                             "else null end as channel_name from claim c where claim_outpoint = ?",
+                                             (claim_outpoint,)).fetchone()
+            channel_name = claim_info[-1]
+            result = _claim_response(*claim_info[:-1])
+            if channel_name:
+                result['channel_name'] = channel_name
             return result
 
         result = yield self.db.runInteraction(_get_claim)

From d55ded78eed85ffe95eb9767ae542e62863e6951 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor1984@riseup.net>
Date: Tue, 8 May 2018 18:30:58 -0300
Subject: [PATCH 3/7] get claim with channel_name from a stream hash in a
 single query

---
 lbrynet/database/storage.py | 68 ++++++++++++++++++++++---------------
 1 file changed, 40 insertions(+), 28 deletions(-)

diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py
index 8c8d29717..2e6bee68d 100644
--- a/lbrynet/database/storage.py
+++ b/lbrynet/database/storage.py
@@ -683,40 +683,34 @@ class SQLiteStorage(object):
 
     @defer.inlineCallbacks
     def get_content_claim(self, stream_hash, include_supports=True):
-        def _get_content_claim(transaction):
-            claim_id = transaction.execute(
-                "select claim.claim_outpoint from content_claim "
-                "inner join claim on claim.claim_outpoint=content_claim.claim_outpoint and content_claim.stream_hash=? "
-                "order by claim.rowid desc", (stream_hash, )
+        def _get_claim_from_stream_hash(transaction):
+            claim_info = transaction.execute(
+                "select c.*, "
+                "case when c.channel_claim_id is not null then "
+                "(select claim_name from claim where claim_id==c.channel_claim_id) "
+                "else null end as channel_name from content_claim "
+                "inner join claim c on c.claim_outpoint=content_claim.claim_outpoint "
+                "and content_claim.stream_hash=? order by c.rowid desc", (stream_hash,)
             ).fetchone()
-            if not claim_id:
+            if not claim_info:
                 return None
-            return claim_id[0]
+            channel_name = claim_info[-1]
+            result = _format_claim_response(*claim_info[:-1])
+            if channel_name:
+                result['channel_name'] = channel_name
+            return result
 
-        content_claim_outpoint = yield self.db.runInteraction(_get_content_claim)
-        result = None
-        if content_claim_outpoint:
-            result = yield self.get_claim(content_claim_outpoint, include_supports)
+        result = yield self.db.runInteraction(_get_claim_from_stream_hash)
+        if result and include_supports:
+            supports = yield self.get_supports(result['claim_id'])
+            result['supports'] = supports
+            result['effective_amount'] = float(
+                sum([support['amount'] for support in supports]) + result['amount']
+            )
         defer.returnValue(result)
 
     @defer.inlineCallbacks
     def get_claim(self, claim_outpoint, include_supports=True):
-        def _claim_response(outpoint, claim_id, name, amount, height, serialized, channel_id, address, claim_sequence):
-            r = {
-                "name": name,
-                "claim_id": claim_id,
-                "address": address,
-                "claim_sequence": claim_sequence,
-                "value": ClaimDict.deserialize(serialized.decode('hex')).claim_dict,
-                "height": height,
-                "amount": float(Decimal(amount) / Decimal(COIN)),
-                "nout": int(outpoint.split(":")[1]),
-                "txid": outpoint.split(":")[0],
-                "channel_claim_id": channel_id,
-                "channel_name": None
-            }
-            return r
-
         def _get_claim(transaction):
             claim_info = transaction.execute("select c.*, "
                                              "case when c.channel_claim_id is not null then "
@@ -724,7 +718,7 @@ class SQLiteStorage(object):
                                              "else null end as channel_name from claim c where claim_outpoint = ?",
                                              (claim_outpoint,)).fetchone()
             channel_name = claim_info[-1]
-            result = _claim_response(*claim_info[:-1])
+            result = _format_claim_response(*claim_info[:-1])
             if channel_name:
                 result['channel_name'] = channel_name
             return result
@@ -792,3 +786,21 @@ class SQLiteStorage(object):
             "where r.timestamp is null or r.timestamp < ?",
             self.clock.seconds() - conf.settings['auto_re_reflect_interval']
         )
+
+
+# Helper functions
+def _format_claim_response(outpoint, claim_id, name, amount, height, serialized, channel_id, address, claim_sequence):
+    r = {
+        "name": name,
+        "claim_id": claim_id,
+        "address": address,
+        "claim_sequence": claim_sequence,
+        "value": ClaimDict.deserialize(serialized.decode('hex')).claim_dict,
+        "height": height,
+        "amount": float(Decimal(amount) / Decimal(COIN)),
+        "nout": int(outpoint.split(":")[1]),
+        "txid": outpoint.split(":")[0],
+        "channel_claim_id": channel_id,
+        "channel_name": None
+    }
+    return r

From ca86af736eb794ff291c7cddd776db2aaa4c0dff Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor1984@riseup.net>
Date: Tue, 8 May 2018 19:46:29 -0300
Subject: [PATCH 4/7] add batching support to get_supports and tests

---
 lbrynet/database/storage.py                   |  9 +++++++--
 .../tests/unit/database/test_SQLiteStorage.py | 19 +++++++++++++++++++
 2 files changed, 26 insertions(+), 2 deletions(-)

diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py
index 2e6bee68d..8f7a4b2cf 100644
--- a/lbrynet/database/storage.py
+++ b/lbrynet/database/storage.py
@@ -552,7 +552,7 @@ class SQLiteStorage(object):
                 )
         return self.db.runInteraction(_save_support)
 
-    def get_supports(self, claim_id):
+    def get_supports(self, *claim_ids):
         def _format_support(outpoint, supported_id, amount, address):
             return {
                 "txid": outpoint.split(":")[0],
@@ -563,10 +563,15 @@ class SQLiteStorage(object):
             }
 
         def _get_supports(transaction):
+            if len(claim_ids) == 1:
+                bind = "=?"
+            else:
+                bind = "in ({})".format(','.join('?' for _ in range(len(claim_ids))))
             return [
                 _format_support(*support_info)
                 for support_info in transaction.execute(
-                    "select * from support where claim_id=?", (claim_id, )
+                    "select * from support where claim_id {}".format(bind),
+                    tuple(claim_ids)
                 ).fetchall()
             ]
 
diff --git a/lbrynet/tests/unit/database/test_SQLiteStorage.py b/lbrynet/tests/unit/database/test_SQLiteStorage.py
index 5bfe72988..5df80ee2e 100644
--- a/lbrynet/tests/unit/database/test_SQLiteStorage.py
+++ b/lbrynet/tests/unit/database/test_SQLiteStorage.py
@@ -163,6 +163,25 @@ class BlobStorageTests(StorageTest):
         self.assertEqual(blob_hashes, [])
 
 
+class SupportsStorageTests(StorageTest):
+    @defer.inlineCallbacks
+    def test_supports_storage(self):
+        claim_ids = [random_lbry_hash() for _ in range(10)]
+        random_supports = [{"txid": random_lbry_hash(), "nout":i, "address": "addr{}".format(i), "amount": i}
+                    for i in range(20)]
+        expected_supports = {}
+        for idx, claim_id in enumerate(claim_ids):
+            yield self.storage.save_supports(claim_id, random_supports[idx*2:idx*2+2])
+            for random_support in random_supports[idx*2:idx*2+2]:
+                random_support['claim_id'] = claim_id
+                expected_supports.setdefault(claim_id, []).append(random_support)
+        supports = yield self.storage.get_supports(claim_ids[0])
+        self.assertEqual(supports, expected_supports[claim_ids[0]])
+        all_supports = yield self.storage.get_supports(*claim_ids)
+        for support in all_supports:
+            self.assertIn(support, expected_supports[support['claim_id']])
+
+
 class StreamStorageTests(StorageTest):
     @defer.inlineCallbacks
     def test_store_stream(self, stream_hash=None):

From b6cedfec56b09bf7b5e5912a51eb22b4724982b7 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor1984@riseup.net>
Date: Wed, 9 May 2018 10:50:44 -0300
Subject: [PATCH 5/7] batch-start the file manager

---
 lbrynet/database/storage.py                   | 38 +++++++++++++++++++
 .../file_manager/EncryptedFileDownloader.py   | 19 ++++++----
 lbrynet/file_manager/EncryptedFileManager.py  |  9 +++--
 3 files changed, 55 insertions(+), 11 deletions(-)

diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py
index 8f7a4b2cf..aae8f180a 100644
--- a/lbrynet/database/storage.py
+++ b/lbrynet/database/storage.py
@@ -714,6 +714,44 @@ class SQLiteStorage(object):
             )
         defer.returnValue(result)
 
+    @defer.inlineCallbacks
+    def get_claims_from_stream_hashes(self, stream_hashes, include_supports=True):
+        def _batch_get_claim(transaction):
+            results = {}
+            bind = "({})".format(','.join('?' for _ in range(len(stream_hashes))))
+            claim_infos = transaction.execute(
+                "select content_claim.stream_hash, c.*, "
+                "case when c.channel_claim_id is not null then "
+                "(select claim_name from claim where claim_id==c.channel_claim_id) "
+                "else null end as channel_name from content_claim "
+                "inner join claim c on c.claim_outpoint=content_claim.claim_outpoint "
+                "and content_claim.stream_hash in {} order by c.rowid desc".format(bind),
+                tuple(stream_hashes)
+            ).fetchall()
+            for claim_info in claim_infos:
+                channel_name = claim_info[-1]
+                stream_hash = claim_info[0]
+                result = _format_claim_response(*claim_info[1:-1])
+                if channel_name:
+                    result['channel_name'] = channel_name
+                results[stream_hash] = result
+            return results
+
+        claims = yield self.db.runInteraction(_batch_get_claim)
+        if include_supports:
+            all_supports = {}
+            for support in (yield self.get_supports(*[claim['claim_id'] for claim in claims.values()])):
+                all_supports.setdefault(support['claim_id'], []).append(support)
+            for stream_hash in claims.keys():
+                claim = claims[stream_hash]
+                supports = all_supports.get(claim['claim_id'], [])
+                claim['supports'] = supports
+                claim['effective_amount'] = float(
+                    sum([support['amount'] for support in supports]) + claim['amount']
+                )
+                claims[stream_hash] = claim
+        defer.returnValue(claims)
+
     @defer.inlineCallbacks
     def get_claim(self, claim_outpoint, include_supports=True):
         def _get_claim(transaction):
diff --git a/lbrynet/file_manager/EncryptedFileDownloader.py b/lbrynet/file_manager/EncryptedFileDownloader.py
index 2e2a054c1..25abd3e18 100644
--- a/lbrynet/file_manager/EncryptedFileDownloader.py
+++ b/lbrynet/file_manager/EncryptedFileDownloader.py
@@ -56,18 +56,21 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
         self.channel_name = None
         self.metadata = None
 
+    def set_claim_info(self, claim_info):
+        self.claim_id = claim_info['claim_id']
+        self.txid = claim_info['txid']
+        self.nout = claim_info['nout']
+        self.channel_claim_id = claim_info['channel_claim_id']
+        self.outpoint = "%s:%i" % (self.txid, self.nout)
+        self.claim_name = claim_info['name']
+        self.channel_name = claim_info['channel_name']
+        self.metadata = claim_info['value']['stream']['metadata']
+
     @defer.inlineCallbacks
     def get_claim_info(self, include_supports=True):
         claim_info = yield self.storage.get_content_claim(self.stream_hash, include_supports)
         if claim_info:
-            self.claim_id = claim_info['claim_id']
-            self.txid = claim_info['txid']
-            self.nout = claim_info['nout']
-            self.channel_claim_id = claim_info['channel_claim_id']
-            self.outpoint = "%s:%i" % (self.txid, self.nout)
-            self.claim_name = claim_info['name']
-            self.channel_name = claim_info['channel_name']
-            self.metadata = claim_info['value']['stream']['metadata']
+            self.set_claim_info(claim_info)
 
         defer.returnValue(claim_info)
 
diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py
index 73cc3fb12..d28006dbd 100644
--- a/lbrynet/file_manager/EncryptedFileManager.py
+++ b/lbrynet/file_manager/EncryptedFileManager.py
@@ -97,13 +97,14 @@ class EncryptedFileManager(object):
         )
 
     @defer.inlineCallbacks
-    def _start_lbry_file(self, file_info, payment_rate_manager, verify_stream):
+    def _start_lbry_file(self, file_info, payment_rate_manager, verify_stream, claim_info):
         lbry_file = self._get_lbry_file(
             file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'],
             file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'],
             file_info['suggested_file_name']
         )
-        yield lbry_file.get_claim_info()
+        if claim_info:
+            lbry_file.set_claim_info(claim_info)
         try:
             # verify if the stream is valid (we might have downloaded an invalid stream
             # in the past when the validation check didn't work. This runs after every
@@ -130,13 +131,15 @@ class EncryptedFileManager(object):
     @defer.inlineCallbacks
     def _start_lbry_files(self, verify_streams):
         files = yield self.session.storage.get_all_lbry_files()
+        claim_infos = yield self.session.storage.get_claims_from_stream_hashes([file['stream_hash'] for file in files])
         b_prm = self.session.base_payment_rate_manager
         payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker)
 
         log.info("Starting %i files", len(files))
         dl = []
         for file_info in files:
-            dl.append(self._start_lbry_file(file_info, payment_rate_manager, verify_streams))
+            claim_info = claim_infos.get(file_info['stream_hash'])
+            dl.append(self._start_lbry_file(file_info, payment_rate_manager, verify_streams, claim_info))
 
         yield defer.DeferredList(dl)
 

From e1f4623a652556d76ef9ef64289234354de16672 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor1984@riseup.net>
Date: Wed, 9 May 2018 12:40:03 -0300
Subject: [PATCH 6/7] add changelog

---
 CHANGELOG.md | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index b94d5fd44..d024e5292 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -30,6 +30,8 @@ at anytime.
   * changed txrequests for treq
   * changed cryptography version to 2.2.2
   * removed pycrypto dependency, replacing all calls to cryptography
+  * full verification of streams only during migration instead of every startup
+  * database batching functions for starting up the file manager
   * several internal dht functions to use inlineCallbacks
   * `DHTHashAnnouncer` and `Node` manage functions to use `LoopingCall`s instead of scheduling with `callLater`.
   * `store` kademlia rpc method to block on the call finishing and to return storing peer information

From b48492c1d67250e10dbf0fd598b57f71442a5560 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor1984@riseup.net>
Date: Tue, 15 May 2018 21:44:30 -0300
Subject: [PATCH 7/7] verify streams on a new migration instead

---
 lbrynet/daemon/Daemon.py                     |  8 +--
 lbrynet/database/migrator/dbmigrator.py      |  2 +
 lbrynet/database/migrator/migrate8to9.py     | 54 ++++++++++++++++++++
 lbrynet/file_manager/EncryptedFileManager.py | 46 +++++------------
 4 files changed, 74 insertions(+), 36 deletions(-)
 create mode 100644 lbrynet/database/migrator/migrate8to9.py

diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py
index 0eaa24285..3e5b407ee 100644
--- a/lbrynet/daemon/Daemon.py
+++ b/lbrynet/daemon/Daemon.py
@@ -199,7 +199,7 @@ class Daemon(AuthJSONRPCServer):
         self.connected_to_internet = True
         self.connection_status_code = None
         self.platform = None
-        self.current_db_revision = 8
+        self.current_db_revision = 9
         self.db_revision_file = conf.settings.get_db_revision_filename()
         self.session = None
         self._session_id = conf.settings.get_session_id()
@@ -244,7 +244,7 @@ class Daemon(AuthJSONRPCServer):
         yield self._start_analytics()
         yield add_lbry_file_to_sd_identifier(self.sd_identifier)
         yield self._setup_stream_identifier()
-        yield self._setup_lbry_file_manager(verify_streams=migrated)
+        yield self._setup_lbry_file_manager()
         yield self._setup_query_handlers()
         yield self._setup_server()
         log.info("Starting balance: " + str(self.session.wallet.get_balance()))
@@ -512,11 +512,11 @@ class Daemon(AuthJSONRPCServer):
         defer.returnValue(migrated)
 
     @defer.inlineCallbacks
-    def _setup_lbry_file_manager(self, verify_streams):
+    def _setup_lbry_file_manager(self):
         log.info('Starting the file manager')
         self.startup_status = STARTUP_STAGES[3]
         self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier)
-        yield self.lbry_file_manager.setup(verify_streams)
+        yield self.lbry_file_manager.setup()
         log.info('Done setting up file manager')
 
     def _start_analytics(self):
diff --git a/lbrynet/database/migrator/dbmigrator.py b/lbrynet/database/migrator/dbmigrator.py
index ab1519380..196263f0a 100644
--- a/lbrynet/database/migrator/dbmigrator.py
+++ b/lbrynet/database/migrator/dbmigrator.py
@@ -18,6 +18,8 @@ def migrate_db(db_dir, start, end):
             from lbrynet.database.migrator.migrate6to7 import do_migration
         elif current == 7:
             from lbrynet.database.migrator.migrate7to8 import do_migration
+        elif current == 8:
+            from lbrynet.database.migrator.migrate8to9 import do_migration
         else:
             raise Exception("DB migration of version {} to {} is not available".format(current,
                                                                                        current+1))
diff --git a/lbrynet/database/migrator/migrate8to9.py b/lbrynet/database/migrator/migrate8to9.py
new file mode 100644
index 000000000..a518e9899
--- /dev/null
+++ b/lbrynet/database/migrator/migrate8to9.py
@@ -0,0 +1,54 @@
+import sqlite3
+import logging
+import os
+
+from lbrynet.core.Error import InvalidStreamDescriptorError
+from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, format_sd_info, format_blobs, validate_descriptor
+from lbrynet.cryptstream.CryptBlob import CryptBlobInfo
+
+log = logging.getLogger(__name__)
+
+
+def do_migration(db_dir):
+    db_path = os.path.join(db_dir, "lbrynet.sqlite")
+    blob_dir = os.path.join(db_dir, "blobfiles")
+    connection = sqlite3.connect(db_path)
+    cursor = connection.cursor()
+
+    query = "select stream_name, stream_key, suggested_filename, sd_hash, stream_hash from stream"
+    streams = cursor.execute(query).fetchall()
+
+    blobs = cursor.execute("select s.stream_hash, s.position, s.iv, b.blob_hash, b.blob_length from stream_blob s "
+                           "left outer join blob b ON b.blob_hash=s.blob_hash order by s.position").fetchall()
+    blobs_by_stream = {}
+    for stream_hash, position, iv, blob_hash, blob_length in blobs:
+        blobs_by_stream.setdefault(stream_hash, []).append(CryptBlobInfo(blob_hash, position, blob_length or 0, iv))
+
+    for stream_name, stream_key, suggested_filename, sd_hash, stream_hash in streams:
+        sd_info = format_sd_info(
+            EncryptedFileStreamType, stream_name, stream_key,
+            suggested_filename, stream_hash, format_blobs(blobs_by_stream[stream_hash])
+        )
+        try:
+            validate_descriptor(sd_info)
+        except InvalidStreamDescriptorError as err:
+            log.warning("Stream for descriptor %s is invalid (%s), cleaning it up",
+                        sd_hash, err.message)
+            blob_hashes = [blob.blob_hash for blob in blobs_by_stream[stream_hash]]
+            delete_stream(cursor, stream_hash, sd_hash, blob_hashes, blob_dir)
+
+    connection.commit()
+    connection.close()
+
+
+def delete_stream(transaction, stream_hash, sd_hash, blob_hashes, blob_dir):
+    transaction.execute("delete from content_claim where stream_hash=? ", (stream_hash,))
+    transaction.execute("delete from file where stream_hash=? ", (stream_hash, ))
+    transaction.execute("delete from stream_blob where stream_hash=?", (stream_hash, ))
+    transaction.execute("delete from stream where stream_hash=? ", (stream_hash, ))
+    transaction.execute("delete from blob where blob_hash=?", (sd_hash, ))
+    for blob_hash in blob_hashes:
+        transaction.execute("delete from blob where blob_hash=?", (blob_hash, ))
+        file_path = os.path.join(blob_dir, blob_hash)
+        if os.path.isfile(file_path):
+            os.unlink(file_path)
diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py
index d28006dbd..02245c39c 100644
--- a/lbrynet/file_manager/EncryptedFileManager.py
+++ b/lbrynet/file_manager/EncryptedFileManager.py
@@ -6,12 +6,11 @@ import logging
 
 from twisted.internet import defer, task, reactor
 from twisted.python.failure import Failure
-from lbrynet.core.Error import InvalidStreamDescriptorError
 from lbrynet.reflector.reupload import reflect_file
 from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager
 from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
 from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
-from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info, validate_descriptor
+from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info
 from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError
 from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError
 from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call
@@ -42,9 +41,9 @@ class EncryptedFileManager(object):
         self.lbry_file_reflector = task.LoopingCall(self.reflect_lbry_files)
 
     @defer.inlineCallbacks
-    def setup(self, verify_streams=False):
+    def setup(self):
         yield self._add_to_sd_identifier()
-        yield self._start_lbry_files(verify_streams)
+        yield self._start_lbry_files()
         log.info("Started file manager")
 
     def get_lbry_file_status(self, lbry_file):
@@ -96,8 +95,7 @@ class EncryptedFileManager(object):
             suggested_file_name=suggested_file_name
         )
 
-    @defer.inlineCallbacks
-    def _start_lbry_file(self, file_info, payment_rate_manager, verify_stream, claim_info):
+    def _start_lbry_file(self, file_info, payment_rate_manager, claim_info):
         lbry_file = self._get_lbry_file(
             file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'],
             file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'],
@@ -106,42 +104,26 @@ class EncryptedFileManager(object):
         if claim_info:
             lbry_file.set_claim_info(claim_info)
         try:
-            # verify if the stream is valid (we might have downloaded an invalid stream
-            # in the past when the validation check didn't work. This runs after every
-            # migration to ensure blobs migrated from that past version gets verified)
-            if verify_stream:
-                stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True)
-                validate_descriptor(stream_info)
-        except InvalidStreamDescriptorError as err:
-            log.warning("Stream for descriptor %s is invalid (%s), cleaning it up",
-                        lbry_file.sd_hash, err.message)
-            yield lbry_file.delete_data()
-            yield self.session.storage.delete_stream(lbry_file.stream_hash)
-        else:
-            try:
-                # restore will raise an Exception if status is unknown
-                lbry_file.restore(file_info['status'])
-                self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info
-                self.lbry_files.append(lbry_file)
-                if len(self.lbry_files) % 500 == 0:
-                    log.info("Started %i files", len(self.lbry_files))
-            except Exception:
-                log.warning("Failed to start %i", file_info.get('rowid'))
+            # restore will raise an Exception if status is unknown
+            lbry_file.restore(file_info['status'])
+            self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info
+            self.lbry_files.append(lbry_file)
+            if len(self.lbry_files) % 500 == 0:
+                log.info("Started %i files", len(self.lbry_files))
+        except Exception:
+            log.warning("Failed to start %i", file_info.get('rowid'))
 
     @defer.inlineCallbacks
-    def _start_lbry_files(self, verify_streams):
+    def _start_lbry_files(self):
         files = yield self.session.storage.get_all_lbry_files()
         claim_infos = yield self.session.storage.get_claims_from_stream_hashes([file['stream_hash'] for file in files])
         b_prm = self.session.base_payment_rate_manager
         payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker)
 
         log.info("Starting %i files", len(files))
-        dl = []
         for file_info in files:
             claim_info = claim_infos.get(file_info['stream_hash'])
-            dl.append(self._start_lbry_file(file_info, payment_rate_manager, verify_streams, claim_info))
-
-        yield defer.DeferredList(dl)
+            self._start_lbry_file(file_info, payment_rate_manager, claim_info)
 
         log.info("Started %i lbry files", len(self.lbry_files))
         if self.auto_re_reflect is True: