forked from LBRYCommunity/lbry-sdk
verify streams on a new migration instead
This commit is contained in:
parent
e1f4623a65
commit
b48492c1d6
4 changed files with 74 additions and 36 deletions
|
@ -199,7 +199,7 @@ class Daemon(AuthJSONRPCServer):
|
|||
self.connected_to_internet = True
|
||||
self.connection_status_code = None
|
||||
self.platform = None
|
||||
self.current_db_revision = 8
|
||||
self.current_db_revision = 9
|
||||
self.db_revision_file = conf.settings.get_db_revision_filename()
|
||||
self.session = None
|
||||
self._session_id = conf.settings.get_session_id()
|
||||
|
@ -244,7 +244,7 @@ class Daemon(AuthJSONRPCServer):
|
|||
yield self._start_analytics()
|
||||
yield add_lbry_file_to_sd_identifier(self.sd_identifier)
|
||||
yield self._setup_stream_identifier()
|
||||
yield self._setup_lbry_file_manager(verify_streams=migrated)
|
||||
yield self._setup_lbry_file_manager()
|
||||
yield self._setup_query_handlers()
|
||||
yield self._setup_server()
|
||||
log.info("Starting balance: " + str(self.session.wallet.get_balance()))
|
||||
|
@ -512,11 +512,11 @@ class Daemon(AuthJSONRPCServer):
|
|||
defer.returnValue(migrated)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _setup_lbry_file_manager(self, verify_streams):
|
||||
def _setup_lbry_file_manager(self):
|
||||
log.info('Starting the file manager')
|
||||
self.startup_status = STARTUP_STAGES[3]
|
||||
self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier)
|
||||
yield self.lbry_file_manager.setup(verify_streams)
|
||||
yield self.lbry_file_manager.setup()
|
||||
log.info('Done setting up file manager')
|
||||
|
||||
def _start_analytics(self):
|
||||
|
|
|
@ -18,6 +18,8 @@ def migrate_db(db_dir, start, end):
|
|||
from lbrynet.database.migrator.migrate6to7 import do_migration
|
||||
elif current == 7:
|
||||
from lbrynet.database.migrator.migrate7to8 import do_migration
|
||||
elif current == 8:
|
||||
from lbrynet.database.migrator.migrate8to9 import do_migration
|
||||
else:
|
||||
raise Exception("DB migration of version {} to {} is not available".format(current,
|
||||
current+1))
|
||||
|
|
54
lbrynet/database/migrator/migrate8to9.py
Normal file
54
lbrynet/database/migrator/migrate8to9.py
Normal file
|
@ -0,0 +1,54 @@
|
|||
import sqlite3
|
||||
import logging
|
||||
import os
|
||||
|
||||
from lbrynet.core.Error import InvalidStreamDescriptorError
|
||||
from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, format_sd_info, format_blobs, validate_descriptor
|
||||
from lbrynet.cryptstream.CryptBlob import CryptBlobInfo
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def do_migration(db_dir):
|
||||
db_path = os.path.join(db_dir, "lbrynet.sqlite")
|
||||
blob_dir = os.path.join(db_dir, "blobfiles")
|
||||
connection = sqlite3.connect(db_path)
|
||||
cursor = connection.cursor()
|
||||
|
||||
query = "select stream_name, stream_key, suggested_filename, sd_hash, stream_hash from stream"
|
||||
streams = cursor.execute(query).fetchall()
|
||||
|
||||
blobs = cursor.execute("select s.stream_hash, s.position, s.iv, b.blob_hash, b.blob_length from stream_blob s "
|
||||
"left outer join blob b ON b.blob_hash=s.blob_hash order by s.position").fetchall()
|
||||
blobs_by_stream = {}
|
||||
for stream_hash, position, iv, blob_hash, blob_length in blobs:
|
||||
blobs_by_stream.setdefault(stream_hash, []).append(CryptBlobInfo(blob_hash, position, blob_length or 0, iv))
|
||||
|
||||
for stream_name, stream_key, suggested_filename, sd_hash, stream_hash in streams:
|
||||
sd_info = format_sd_info(
|
||||
EncryptedFileStreamType, stream_name, stream_key,
|
||||
suggested_filename, stream_hash, format_blobs(blobs_by_stream[stream_hash])
|
||||
)
|
||||
try:
|
||||
validate_descriptor(sd_info)
|
||||
except InvalidStreamDescriptorError as err:
|
||||
log.warning("Stream for descriptor %s is invalid (%s), cleaning it up",
|
||||
sd_hash, err.message)
|
||||
blob_hashes = [blob.blob_hash for blob in blobs_by_stream[stream_hash]]
|
||||
delete_stream(cursor, stream_hash, sd_hash, blob_hashes, blob_dir)
|
||||
|
||||
connection.commit()
|
||||
connection.close()
|
||||
|
||||
|
||||
def delete_stream(transaction, stream_hash, sd_hash, blob_hashes, blob_dir):
|
||||
transaction.execute("delete from content_claim where stream_hash=? ", (stream_hash,))
|
||||
transaction.execute("delete from file where stream_hash=? ", (stream_hash, ))
|
||||
transaction.execute("delete from stream_blob where stream_hash=?", (stream_hash, ))
|
||||
transaction.execute("delete from stream where stream_hash=? ", (stream_hash, ))
|
||||
transaction.execute("delete from blob where blob_hash=?", (sd_hash, ))
|
||||
for blob_hash in blob_hashes:
|
||||
transaction.execute("delete from blob where blob_hash=?", (blob_hash, ))
|
||||
file_path = os.path.join(blob_dir, blob_hash)
|
||||
if os.path.isfile(file_path):
|
||||
os.unlink(file_path)
|
|
@ -6,12 +6,11 @@ import logging
|
|||
|
||||
from twisted.internet import defer, task, reactor
|
||||
from twisted.python.failure import Failure
|
||||
from lbrynet.core.Error import InvalidStreamDescriptorError
|
||||
from lbrynet.reflector.reupload import reflect_file
|
||||
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager
|
||||
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
|
||||
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
|
||||
from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info, validate_descriptor
|
||||
from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info
|
||||
from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError
|
||||
from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError
|
||||
from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call
|
||||
|
@ -42,9 +41,9 @@ class EncryptedFileManager(object):
|
|||
self.lbry_file_reflector = task.LoopingCall(self.reflect_lbry_files)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def setup(self, verify_streams=False):
|
||||
def setup(self):
|
||||
yield self._add_to_sd_identifier()
|
||||
yield self._start_lbry_files(verify_streams)
|
||||
yield self._start_lbry_files()
|
||||
log.info("Started file manager")
|
||||
|
||||
def get_lbry_file_status(self, lbry_file):
|
||||
|
@ -96,8 +95,7 @@ class EncryptedFileManager(object):
|
|||
suggested_file_name=suggested_file_name
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _start_lbry_file(self, file_info, payment_rate_manager, verify_stream, claim_info):
|
||||
def _start_lbry_file(self, file_info, payment_rate_manager, claim_info):
|
||||
lbry_file = self._get_lbry_file(
|
||||
file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'],
|
||||
file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'],
|
||||
|
@ -106,42 +104,26 @@ class EncryptedFileManager(object):
|
|||
if claim_info:
|
||||
lbry_file.set_claim_info(claim_info)
|
||||
try:
|
||||
# verify if the stream is valid (we might have downloaded an invalid stream
|
||||
# in the past when the validation check didn't work. This runs after every
|
||||
# migration to ensure blobs migrated from that past version gets verified)
|
||||
if verify_stream:
|
||||
stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True)
|
||||
validate_descriptor(stream_info)
|
||||
except InvalidStreamDescriptorError as err:
|
||||
log.warning("Stream for descriptor %s is invalid (%s), cleaning it up",
|
||||
lbry_file.sd_hash, err.message)
|
||||
yield lbry_file.delete_data()
|
||||
yield self.session.storage.delete_stream(lbry_file.stream_hash)
|
||||
else:
|
||||
try:
|
||||
# restore will raise an Exception if status is unknown
|
||||
lbry_file.restore(file_info['status'])
|
||||
self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info
|
||||
self.lbry_files.append(lbry_file)
|
||||
if len(self.lbry_files) % 500 == 0:
|
||||
log.info("Started %i files", len(self.lbry_files))
|
||||
except Exception:
|
||||
log.warning("Failed to start %i", file_info.get('rowid'))
|
||||
# restore will raise an Exception if status is unknown
|
||||
lbry_file.restore(file_info['status'])
|
||||
self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info
|
||||
self.lbry_files.append(lbry_file)
|
||||
if len(self.lbry_files) % 500 == 0:
|
||||
log.info("Started %i files", len(self.lbry_files))
|
||||
except Exception:
|
||||
log.warning("Failed to start %i", file_info.get('rowid'))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _start_lbry_files(self, verify_streams):
|
||||
def _start_lbry_files(self):
|
||||
files = yield self.session.storage.get_all_lbry_files()
|
||||
claim_infos = yield self.session.storage.get_claims_from_stream_hashes([file['stream_hash'] for file in files])
|
||||
b_prm = self.session.base_payment_rate_manager
|
||||
payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker)
|
||||
|
||||
log.info("Starting %i files", len(files))
|
||||
dl = []
|
||||
for file_info in files:
|
||||
claim_info = claim_infos.get(file_info['stream_hash'])
|
||||
dl.append(self._start_lbry_file(file_info, payment_rate_manager, verify_streams, claim_info))
|
||||
|
||||
yield defer.DeferredList(dl)
|
||||
self._start_lbry_file(file_info, payment_rate_manager, claim_info)
|
||||
|
||||
log.info("Started %i lbry files", len(self.lbry_files))
|
||||
if self.auto_re_reflect is True:
|
||||
|
|
Loading…
Add table
Reference in a new issue