forked from LBRYCommunity/lbry-sdk
migrate blob database to have should_announce and last_announce_time
This commit is contained in:
parent
d840b107c8
commit
8955838191
4 changed files with 108 additions and 2 deletions
|
@ -128,7 +128,10 @@ class DiskBlobManager(DHTHashSupplier):
|
||||||
" blob_hash text primary key, " +
|
" blob_hash text primary key, " +
|
||||||
" blob_length integer, " +
|
" blob_length integer, " +
|
||||||
" last_verified_time real, " +
|
" last_verified_time real, " +
|
||||||
" next_announce_time real)")
|
" next_announce_time real, " +
|
||||||
|
" last_announce_time real, " +
|
||||||
|
" should_announce integer)")
|
||||||
|
|
||||||
|
|
||||||
transaction.execute("create table if not exists download (" +
|
transaction.execute("create table if not exists download (" +
|
||||||
" id integer primary key autoincrement, " +
|
" id integer primary key autoincrement, " +
|
||||||
|
|
|
@ -197,7 +197,7 @@ class Daemon(AuthJSONRPCServer):
|
||||||
self.connected_to_internet = True
|
self.connected_to_internet = True
|
||||||
self.connection_status_code = None
|
self.connection_status_code = None
|
||||||
self.platform = None
|
self.platform = None
|
||||||
self.current_db_revision = 3
|
self.current_db_revision = 4
|
||||||
self.db_revision_file = conf.settings.get_db_revision_filename()
|
self.db_revision_file = conf.settings.get_db_revision_filename()
|
||||||
self.session = None
|
self.session = None
|
||||||
self.uploaded_temp_files = []
|
self.uploaded_temp_files = []
|
||||||
|
|
|
@ -10,6 +10,9 @@ def migrate_db(db_dir, start, end):
|
||||||
elif current == 2:
|
elif current == 2:
|
||||||
from lbrynet.db_migrator.migrate2to3 import do_migration
|
from lbrynet.db_migrator.migrate2to3 import do_migration
|
||||||
do_migration(db_dir)
|
do_migration(db_dir)
|
||||||
|
elif current == 3:
|
||||||
|
from lbrynet.db_migrator.migrate3to4 import do_migration
|
||||||
|
do_migration(db_dir)
|
||||||
else:
|
else:
|
||||||
raise Exception(
|
raise Exception(
|
||||||
"DB migration of version {} to {} is not available".format(current, current+1))
|
"DB migration of version {} to {} is not available".format(current, current+1))
|
||||||
|
|
100
lbrynet/db_migrator/migrate3to4.py
Normal file
100
lbrynet/db_migrator/migrate3to4.py
Normal file
|
@ -0,0 +1,100 @@
|
||||||
|
import sqlite3
|
||||||
|
import os
|
||||||
|
import logging
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def do_migration(db_dir):
|
||||||
|
log.info("Doing the migration")
|
||||||
|
migrate_blobs_db(db_dir)
|
||||||
|
log.info("Migration succeeded")
|
||||||
|
|
||||||
|
|
||||||
|
def migrate_blobs_db(db_dir):
|
||||||
|
"""
|
||||||
|
We migrate the blobs.db used in BlobManager to have a "should_announce" column,
|
||||||
|
and set this to True for blobs that are sd_hash's or head blobs (first blob in stream)
|
||||||
|
We also add a "last_announce_time" column for when the blob as last announced
|
||||||
|
for debugging purposes
|
||||||
|
"""
|
||||||
|
|
||||||
|
blobs_db = os.path.join(db_dir, "blobs.db")
|
||||||
|
lbryfile_info_db = os.path.join(db_dir, 'lbryfile_info.db')
|
||||||
|
|
||||||
|
# skip migration on fresh installs
|
||||||
|
if not os.path.isfile(blobs_db) and not os.path.isfile(lbryfile_info_db):
|
||||||
|
return
|
||||||
|
|
||||||
|
# if blobs.db doesn't exist, skip migration
|
||||||
|
if not os.path.isfile(blobs_db):
|
||||||
|
log.error("blobs.db was not found but lbryfile_info.db was found, skipping migration")
|
||||||
|
return
|
||||||
|
|
||||||
|
blobs_db_file = sqlite3.connect(blobs_db)
|
||||||
|
blobs_db_cursor = blobs_db_file.cursor()
|
||||||
|
|
||||||
|
# check if new columns exist (it shouldn't) and create it
|
||||||
|
try:
|
||||||
|
blobs_db_cursor.execute("SELECT should_announce FROM blobs")
|
||||||
|
except sqlite3.OperationalError:
|
||||||
|
blobs_db_cursor.execute(
|
||||||
|
"ALTER TABLE blobs ADD COLUMN should_announce integer NOT NULL DEFAULT 0")
|
||||||
|
else:
|
||||||
|
log.warn("should_announce already exists somehow, proceeding anyways")
|
||||||
|
|
||||||
|
try:
|
||||||
|
blobs_db_cursor.execute("SELECT last_announce_time FROM blobs")
|
||||||
|
except sqlite3.OperationalError:
|
||||||
|
blobs_db_cursor.execute("ALTER TABLE blobs ADD COLUMN last_announce_time")
|
||||||
|
else:
|
||||||
|
log.warn("last_announce_time already exist somehow, proceeding anyways")
|
||||||
|
|
||||||
|
|
||||||
|
# if lbryfile_info.db doesn't exist, skip marking blobs as should_announce = True
|
||||||
|
if not os.path.isfile(lbryfile_info_db):
|
||||||
|
log.error("lbryfile_info.db was not found, skipping check for should_announce")
|
||||||
|
return
|
||||||
|
|
||||||
|
lbryfile_info_file = sqlite3.connect(lbryfile_info_db)
|
||||||
|
lbryfile_info_cursor = lbryfile_info_file.cursor()
|
||||||
|
|
||||||
|
# find blobs that are stream descriptors
|
||||||
|
lbryfile_info_cursor.execute('SELECT * FROM lbry_file_descriptors')
|
||||||
|
descriptors = lbryfile_info_cursor.fetchall()
|
||||||
|
should_announce_blob_hashes = []
|
||||||
|
for d in descriptors:
|
||||||
|
sd_blob_hash = (d[0],)
|
||||||
|
should_announce_blob_hashes.append(sd_blob_hash)
|
||||||
|
|
||||||
|
# find blobs that are the first blob in a stream
|
||||||
|
lbryfile_info_cursor.execute('SELECT * FROM lbry_file_blobs WHERE position = 0')
|
||||||
|
blobs = lbryfile_info_cursor.fetchall()
|
||||||
|
head_blob_hashes = []
|
||||||
|
for b in blobs:
|
||||||
|
blob_hash = (b[0],)
|
||||||
|
should_announce_blob_hashes.append(blob_hash)
|
||||||
|
|
||||||
|
# now mark them as should_announce = True
|
||||||
|
blobs_db_cursor.executemany('UPDATE blobs SET should_announce=1 WHERE blob_hash=?',
|
||||||
|
should_announce_blob_hashes)
|
||||||
|
|
||||||
|
# Now run some final checks here to make sure migration succeeded
|
||||||
|
try:
|
||||||
|
blobs_db_cursor.execute("SELECT should_announce FROM blobs")
|
||||||
|
except sqlite3.OperationalError:
|
||||||
|
raise Exception('Migration failed, cannot find should_announce')
|
||||||
|
|
||||||
|
try:
|
||||||
|
blobs_db_cursor.execute("SELECT last_announce_time FROM blobs")
|
||||||
|
except sqlite3.OperationalError:
|
||||||
|
raise Exception('Migration failed, cannot find last_announce_time')
|
||||||
|
|
||||||
|
blobs_db_cursor.execute("SELECT * FROM blobs WHERE should_announce=1")
|
||||||
|
blobs = blobs_db_cursor.fetchall()
|
||||||
|
if len(blobs) != len(should_announce_blob_hashes):
|
||||||
|
log.error("Some how not all blobs were marked as announceable")
|
||||||
|
|
||||||
|
blobs_db_file.commit()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue