add lbry_file_metadata table to save outpoint data of downloads

This commit is contained in:
Jack Robison 2017-12-06 18:27:42 -05:00
parent 90aa89dcae
commit ac7ffdbf3a
No known key found for this signature in database
GPG key ID: 284699E7404E3CFF
5 changed files with 112 additions and 17 deletions

View file

@ -195,7 +195,7 @@ class Daemon(AuthJSONRPCServer):
self.connected_to_internet = True self.connected_to_internet = True
self.connection_status_code = None self.connection_status_code = None
self.platform = None self.platform = None
self.current_db_revision = 4 self.current_db_revision = 5
self.db_revision_file = conf.settings.get_db_revision_filename() self.db_revision_file = conf.settings.get_db_revision_filename()
self.session = None self.session = None
self.uploaded_temp_files = [] self.uploaded_temp_files = []
@ -662,7 +662,7 @@ class Daemon(AuthJSONRPCServer):
defer.returnValue(report) defer.returnValue(report)
@defer.inlineCallbacks @defer.inlineCallbacks
def _download_name(self, name, claim_dict, sd_hash, timeout=None, file_name=None): def _download_name(self, name, claim_dict, sd_hash, txid, nout, timeout=None, file_name=None):
""" """
Add a lbry file to the file manager, start the download, and return the new lbry file. Add a lbry file to the file manager, start the download, and return the new lbry file.
If it already exists in the file manager, return the existing lbry file If it already exists in the file manager, return the existing lbry file
@ -672,6 +672,7 @@ class Daemon(AuthJSONRPCServer):
def _download_finished(download_id, name, claim_dict): def _download_finished(download_id, name, claim_dict):
report = yield self._get_stream_analytics_report(claim_dict) report = yield self._get_stream_analytics_report(claim_dict)
self.analytics_manager.send_download_finished(download_id, name, report, claim_dict) self.analytics_manager.send_download_finished(download_id, name, report, claim_dict)
@defer.inlineCallbacks @defer.inlineCallbacks
def _download_failed(error, download_id, name, claim_dict): def _download_failed(error, download_id, name, claim_dict):
report = yield self._get_stream_analytics_report(claim_dict) report = yield self._get_stream_analytics_report(claim_dict)
@ -693,11 +694,11 @@ class Daemon(AuthJSONRPCServer):
file_name) file_name)
try: try:
lbry_file, finished_deferred = yield self.streams[sd_hash].start(claim_dict, name) lbry_file, finished_deferred = yield self.streams[sd_hash].start(claim_dict, name)
yield self.stream_info_manager.save_outpoint_to_file(lbry_file.rowid, txid, nout)
finished_deferred.addCallbacks(lambda _: _download_finished(download_id, name, finished_deferred.addCallbacks(lambda _: _download_finished(download_id, name,
claim_dict), claim_dict),
lambda e: _download_failed(e, download_id, name, lambda e: _download_failed(e, download_id, name,
claim_dict)) claim_dict))
result = yield self._get_lbry_file_dict(lbry_file, full_status=True) result = yield self._get_lbry_file_dict(lbry_file, full_status=True)
except Exception as err: except Exception as err:
yield _download_failed(err, download_id, name, claim_dict) yield _download_failed(err, download_id, name, claim_dict)
@ -731,6 +732,9 @@ class Daemon(AuthJSONRPCServer):
d = reupload.reflect_stream(publisher.lbry_file) d = reupload.reflect_stream(publisher.lbry_file)
d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name), d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name),
log.exception) log.exception)
yield self.stream_info_manager.save_outpoint_to_file(publisher.lbry_file.rowid,
claim_out['txid'],
int(claim_out['nout']))
self.analytics_manager.send_claim_action('publish') self.analytics_manager.send_claim_action('publish')
log.info("Success! Published to lbry://%s txid: %s nout: %d", name, claim_out['txid'], log.info("Success! Published to lbry://%s txid: %s nout: %d", name, claim_out['txid'],
claim_out['nout']) claim_out['nout'])
@ -897,10 +901,7 @@ class Daemon(AuthJSONRPCServer):
file_status = yield lbry_file.status() file_status = yield lbry_file.status()
message = STREAM_STAGES[2][1] % (file_status.name, file_status.num_completed, message = STREAM_STAGES[2][1] % (file_status.name, file_status.num_completed,
file_status.num_known, file_status.running_status) file_status.num_known, file_status.running_status)
info = yield self.session.wallet.get_claim_metadata_for_sd_hash(lbry_file.sd_hash) outpoint = yield self.stream_info_manager.get_file_outpoint(lbry_file.rowid)
if info:
name, txid, nout = info
outpoint = "%s:%i" % (txid, nout)
result = { result = {
'completed': lbry_file.completed, 'completed': lbry_file.completed,
@ -1536,8 +1537,7 @@ class Daemon(AuthJSONRPCServer):
"Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", ""))) "Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", "")))
else: else:
resolved = resolved['claim'] resolved = resolved['claim']
txid, nout, name = resolved['txid'], resolved['nout'], resolved['name']
name = resolved['name']
claim_dict = ClaimDict.load_dict(resolved['value']) claim_dict = ClaimDict.load_dict(resolved['value'])
sd_hash = claim_dict.source_hash sd_hash = claim_dict.source_hash
@ -1556,8 +1556,8 @@ class Daemon(AuthJSONRPCServer):
log.info('Already have a file for %s', name) log.info('Already have a file for %s', name)
result = yield self._get_lbry_file_dict(lbry_file, full_status=True) result = yield self._get_lbry_file_dict(lbry_file, full_status=True)
else: else:
result = yield self._download_name(name, claim_dict, sd_hash, timeout=timeout, result = yield self._download_name(name, claim_dict, sd_hash, txid, nout,
file_name=file_name) timeout=timeout, file_name=file_name)
response = yield self._render_response(result) response = yield self._render_response(result)
defer.returnValue(response) defer.returnValue(response)

View file

@ -13,6 +13,9 @@ def migrate_db(db_dir, start, end):
elif current == 3: elif current == 3:
from lbrynet.db_migrator.migrate3to4 import do_migration from lbrynet.db_migrator.migrate3to4 import do_migration
do_migration(db_dir) do_migration(db_dir)
elif current == 4:
from lbrynet.db_migrator.migrate4to5 import do_migration
do_migration(db_dir)
else: else:
raise Exception( raise Exception(
"DB migration of version {} to {} is not available".format(current, current+1)) "DB migration of version {} to {} is not available".format(current, current+1))

View file

@ -81,5 +81,6 @@ def migrate_blobs_db(db_dir):
log.error("Some how not all blobs were marked as announceable") log.error("Some how not all blobs were marked as announceable")
blobs_db_file.commit() blobs_db_file.commit()
blobs_db_file.close()
lbryfile_info_file.close()

View file

@ -0,0 +1,63 @@
import sqlite3
import os
import logging
log = logging.getLogger(__name__)
def do_migration(db_dir):
log.info("Doing the migration")
add_lbry_file_metadata(db_dir)
log.info("Migration succeeded")
def add_lbry_file_metadata(db_dir):
"""
We migrate the blobs.db used in BlobManager to have a "should_announce" column,
and set this to True for blobs that are sd_hash's or head blobs (first blob in stream)
"""
name_metadata = os.path.join(db_dir, "blockchainname.db")
lbryfile_info_db = os.path.join(db_dir, 'lbryfile_info.db')
if not os.path.isfile(name_metadata) and not os.path.isfile(lbryfile_info_db):
return
if not os.path.isfile(lbryfile_info_db):
log.error(
"blockchainname.db was not found but lbryfile_info.db was found, skipping migration")
return
name_metadata_db = sqlite3.connect(name_metadata)
lbryfile_db = sqlite3.connect(lbryfile_info_db)
name_metadata_cursor = name_metadata_db.cursor()
lbryfile_cursor = lbryfile_db.cursor()
lbryfile_db.executescript(
"create table if not exists lbry_file_metadata (" +
" lbry_file integer primary key, " +
" txid text, " +
" n integer, " +
" foreign key(lbry_file) references lbry_files(rowid)"
")")
_files = lbryfile_cursor.execute("select rowid, stream_hash from lbry_files").fetchall()
lbry_files = {x[1]: x[0] for x in _files}
for (sd_hash, stream_hash) in lbryfile_cursor.execute("select * "
"from lbry_file_descriptors").fetchall():
lbry_file_id = lbry_files[stream_hash]
outpoint = name_metadata_cursor.execute("select txid, n from name_metadata "
"where sd_hash=?",
(sd_hash,)).fetchall()
if outpoint:
txid, nout = outpoint[0]
lbryfile_cursor.execute("insert into lbry_file_metadata values (?, ?, ?)",
(lbry_file_id, txid, nout))
else:
lbryfile_cursor.execute("insert into lbry_file_metadata values (?, ?, ?)",
(lbry_file_id, None, None))
lbryfile_db.commit()
lbryfile_db.close()
name_metadata_db.close()

View file

@ -124,6 +124,12 @@ class DBEncryptedFileMetadataManager(object):
" stream_hash text," " stream_hash text,"
" foreign key(stream_hash) references lbry_files(stream_hash)" + " foreign key(stream_hash) references lbry_files(stream_hash)" +
")") ")")
transaction.execute("create table if not exists lbry_file_metadata (" +
" lbry_file integer primary key, " +
" txid text, " +
" n integer, " +
" foreign key(lbry_file) references lbry_files(rowid)"
")")
def _open_db(self): def _open_db(self):
# check_same_thread=False is solely to quiet a spurious error that appears to be due # check_same_thread=False is solely to quiet a spurious error that appears to be due
@ -132,19 +138,40 @@ class DBEncryptedFileMetadataManager(object):
# threads. # threads.
return self.db_conn.runInteraction(self._create_tables) return self.db_conn.runInteraction(self._create_tables)
@rerun_if_locked
@defer.inlineCallbacks
def get_file_outpoint(self, rowid):
result = yield self.db_conn.runQuery("select txid, n from lbry_file_metadata "
"where lbry_file=?", (rowid, ))
response = None
if result:
txid, nout = result[0]
if txid is not None and nout is not None:
response = "%s:%i" % (txid, nout)
defer.returnValue(response)
@rerun_if_locked
@defer.inlineCallbacks
def save_outpoint_to_file(self, rowid, txid, nout):
existing_outpoint = yield self.get_file_outpoint(rowid)
if not existing_outpoint:
yield self.db_conn.runOperation("insert into lbry_file_metadata values "
"(?, ?, ?)", (rowid, txid, nout))
@rerun_if_locked @rerun_if_locked
def _delete_stream(self, stream_hash): def _delete_stream(self, stream_hash):
d = self.db_conn.runQuery( d = self.db_conn.runQuery(
"select stream_hash from lbry_files where stream_hash = ?", (stream_hash,)) "select rowid, stream_hash from lbry_files where stream_hash = ?", (stream_hash,))
d.addCallback( d.addCallback(
lambda result: result[0][0] if result else Failure(NoSuchStreamHash(stream_hash))) lambda result: result[0] if result else Failure(NoSuchStreamHash(stream_hash)))
def do_delete(transaction, s_h): def do_delete(transaction, row_id, s_h):
transaction.execute("delete from lbry_files where stream_hash = ?", (s_h,)) transaction.execute("delete from lbry_files where stream_hash = ?", (s_h,))
transaction.execute("delete from lbry_file_blobs where stream_hash = ?", (s_h,)) transaction.execute("delete from lbry_file_blobs where stream_hash = ?", (s_h,))
transaction.execute("delete from lbry_file_descriptors where stream_hash = ?", (s_h,)) transaction.execute("delete from lbry_file_descriptors where stream_hash = ?", (s_h,))
transaction.execute("delete from lbry_file_metadata where lbry_file = ?", (row_id,))
d.addCallback(lambda s_h: self.db_conn.runInteraction(do_delete, s_h)) d.addCallback(lambda (row_id, s_h): self.db_conn.runInteraction(do_delete, row_id, s_h))
return d return d
@rerun_if_locked @rerun_if_locked
@ -294,7 +321,8 @@ class DBEncryptedFileMetadataManager(object):
@rerun_if_locked @rerun_if_locked
def _get_all_lbry_files(self): def _get_all_lbry_files(self):
d = self.db_conn.runQuery("select rowid, stream_hash, blob_data_rate from lbry_file_options") d = self.db_conn.runQuery("select rowid, stream_hash, "
"blob_data_rate from lbry_file_options")
return d return d
@rerun_if_locked @rerun_if_locked