Merge branch 'refactor-file-manager'

This commit is contained in:
Jack Robison 2017-12-06 20:21:27 -05:00
commit 8c865d4187
No known key found for this signature in database
GPG key ID: 284699E7404E3CFF
13 changed files with 248 additions and 246 deletions

View file

@ -27,7 +27,7 @@ at anytime.
* Check claim schema in `publish` before trying to make the claim, return better error messages * Check claim schema in `publish` before trying to make the claim, return better error messages
* Renamed `channel_list_mine` to `channel_list` * Renamed `channel_list_mine` to `channel_list`
* Changed `channel_list` to include channels where the certificate info has been imported but the claim is not in the wallet * Changed `channel_list` to include channels where the certificate info has been imported but the claim is not in the wallet
* Changed `file_list`, `file_delete`, `file_set_status`, and `file_reflect` to no longer return claim related information. * Changed file objects returned by `file_list` and `get` to no longer contain `name`, `claim_id`, or `metadata`
* Increased assumption for time it takes to announce single hash from 1 second to 5 seconds * Increased assumption for time it takes to announce single hash from 1 second to 5 seconds
* Don't set HTTP error codes for failed api requests, conform to http://www.jsonrpc.org/specification#error_object * Don't set HTTP error codes for failed api requests, conform to http://www.jsonrpc.org/specification#error_object
* Return less verbose tracebacks for api requests resulting in errors * Return less verbose tracebacks for api requests resulting in errors
@ -39,6 +39,7 @@ at anytime.
* Added `claim_renew` command * Added `claim_renew` command
* Added user configurable `auto_renew_claim_height_delta` setting, defaults to 0 (off) * Added user configurable `auto_renew_claim_height_delta` setting, defaults to 0 (off)
* Added `lbrynet-console`, a tool to run or connect to lbrynet-daemon and launch an interactive python console with the api functions built in. * Added `lbrynet-console`, a tool to run or connect to lbrynet-daemon and launch an interactive python console with the api functions built in.
* Added a table to the lbry file database to store the outpoint of the claim downloaded from
### Removed ### Removed
* Removed claim related filter arguments `name`, `claim_id`, and `outpoint` from `file_list`, `file_delete`, `file_set_status`, and `file_reflect` * Removed claim related filter arguments `name`, `claim_id`, and `outpoint` from `file_list`, `file_delete`, `file_set_status`, and `file_reflect`

View file

@ -756,8 +756,6 @@ class Wallet(object):
defer.returnValue(claim) defer.returnValue(claim)
@defer.inlineCallbacks @defer.inlineCallbacks
def _handle_claim_result(self, results, update_caches=True): def _handle_claim_result(self, results, update_caches=True):
if not results: if not results:

View file

@ -195,7 +195,7 @@ class Daemon(AuthJSONRPCServer):
self.connected_to_internet = True self.connected_to_internet = True
self.connection_status_code = None self.connection_status_code = None
self.platform = None self.platform = None
self.current_db_revision = 4 self.current_db_revision = 5
self.db_revision_file = conf.settings.get_db_revision_filename() self.db_revision_file = conf.settings.get_db_revision_filename()
self.session = None self.session = None
self.uploaded_temp_files = [] self.uploaded_temp_files = []
@ -539,7 +539,6 @@ class Daemon(AuthJSONRPCServer):
log.info('Starting to setup up file manager') log.info('Starting to setup up file manager')
self.startup_status = STARTUP_STAGES[3] self.startup_status = STARTUP_STAGES[3]
self.stream_info_manager = DBEncryptedFileMetadataManager(self.db_dir) self.stream_info_manager = DBEncryptedFileMetadataManager(self.db_dir)
yield self.stream_info_manager.setup()
self.lbry_file_manager = EncryptedFileManager( self.lbry_file_manager = EncryptedFileManager(
self.session, self.session,
self.stream_info_manager, self.stream_info_manager,
@ -663,7 +662,7 @@ class Daemon(AuthJSONRPCServer):
defer.returnValue(report) defer.returnValue(report)
@defer.inlineCallbacks @defer.inlineCallbacks
def _download_name(self, name, claim_dict, sd_hash, timeout=None, file_name=None): def _download_name(self, name, claim_dict, sd_hash, txid, nout, timeout=None, file_name=None):
""" """
Add a lbry file to the file manager, start the download, and return the new lbry file. Add a lbry file to the file manager, start the download, and return the new lbry file.
If it already exists in the file manager, return the existing lbry file If it already exists in the file manager, return the existing lbry file
@ -673,6 +672,7 @@ class Daemon(AuthJSONRPCServer):
def _download_finished(download_id, name, claim_dict): def _download_finished(download_id, name, claim_dict):
report = yield self._get_stream_analytics_report(claim_dict) report = yield self._get_stream_analytics_report(claim_dict)
self.analytics_manager.send_download_finished(download_id, name, report, claim_dict) self.analytics_manager.send_download_finished(download_id, name, report, claim_dict)
@defer.inlineCallbacks @defer.inlineCallbacks
def _download_failed(error, download_id, name, claim_dict): def _download_failed(error, download_id, name, claim_dict):
report = yield self._get_stream_analytics_report(claim_dict) report = yield self._get_stream_analytics_report(claim_dict)
@ -694,11 +694,11 @@ class Daemon(AuthJSONRPCServer):
file_name) file_name)
try: try:
lbry_file, finished_deferred = yield self.streams[sd_hash].start(claim_dict, name) lbry_file, finished_deferred = yield self.streams[sd_hash].start(claim_dict, name)
yield self.stream_info_manager.save_outpoint_to_file(lbry_file.rowid, txid, nout)
finished_deferred.addCallbacks(lambda _: _download_finished(download_id, name, finished_deferred.addCallbacks(lambda _: _download_finished(download_id, name,
claim_dict), claim_dict),
lambda e: _download_failed(e, download_id, name, lambda e: _download_failed(e, download_id, name,
claim_dict)) claim_dict))
result = yield self._get_lbry_file_dict(lbry_file, full_status=True) result = yield self._get_lbry_file_dict(lbry_file, full_status=True)
except Exception as err: except Exception as err:
yield _download_failed(err, download_id, name, claim_dict) yield _download_failed(err, download_id, name, claim_dict)
@ -732,6 +732,9 @@ class Daemon(AuthJSONRPCServer):
d = reupload.reflect_stream(publisher.lbry_file) d = reupload.reflect_stream(publisher.lbry_file)
d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name), d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name),
log.exception) log.exception)
yield self.stream_info_manager.save_outpoint_to_file(publisher.lbry_file.rowid,
claim_out['txid'],
int(claim_out['nout']))
self.analytics_manager.send_claim_action('publish') self.analytics_manager.send_claim_action('publish')
log.info("Success! Published to lbry://%s txid: %s nout: %d", name, claim_out['txid'], log.info("Success! Published to lbry://%s txid: %s nout: %d", name, claim_out['txid'],
claim_out['nout']) claim_out['nout'])
@ -765,7 +768,7 @@ class Daemon(AuthJSONRPCServer):
downloader.cancel() downloader.cancel()
d = defer.succeed(None) d = defer.succeed(None)
reactor.callLater(self.search_timeout, _check_est, d) reactor.callLater(conf.settings['search_timeout'], _check_est, d)
d.addCallback( d.addCallback(
lambda _: download_sd_blob( lambda _: download_sd_blob(
self.session, sd_hash, self.session.payment_rate_manager)) self.session, sd_hash, self.session.payment_rate_manager))
@ -891,15 +894,14 @@ class Daemon(AuthJSONRPCServer):
else: else:
written_bytes = 0 written_bytes = 0
size = message = outpoint = None
if full_status: if full_status:
size = yield lbry_file.get_total_bytes() size = yield lbry_file.get_total_bytes()
file_status = yield lbry_file.status() file_status = yield lbry_file.status()
message = STREAM_STAGES[2][1] % (file_status.name, file_status.num_completed, message = STREAM_STAGES[2][1] % (file_status.name, file_status.num_completed,
file_status.num_known, file_status.running_status) file_status.num_known, file_status.running_status)
else: outpoint = yield self.stream_info_manager.get_file_outpoint(lbry_file.rowid)
size = None
message = None
result = { result = {
'completed': lbry_file.completed, 'completed': lbry_file.completed,
@ -917,6 +919,7 @@ class Daemon(AuthJSONRPCServer):
'total_bytes': size, 'total_bytes': size,
'written_bytes': written_bytes, 'written_bytes': written_bytes,
'message': message, 'message': message,
'outpoint': outpoint
} }
defer.returnValue(result) defer.returnValue(result)
@ -1312,9 +1315,10 @@ class Daemon(AuthJSONRPCServer):
'download_path': (str) download path of file, 'download_path': (str) download path of file,
'mime_type': (str) mime type of file, 'mime_type': (str) mime type of file,
'key': (str) key attached to file, 'key': (str) key attached to file,
'total_bytes': (int) file size in bytes, None if full_status is false 'total_bytes': (int) file size in bytes, None if full_status is false,
'written_bytes': (int) written size in bytes 'written_bytes': (int) written size in bytes,
'message': (str), None if full_status is false 'message': (str), status message, None if full_status is false
'outpoint': (str), None if full_status is false or if claim is not found
}, },
] ]
""" """
@ -1500,25 +1504,22 @@ class Daemon(AuthJSONRPCServer):
Returns: Returns:
(dict) Dictionary containing information about the stream (dict) Dictionary containing information about the stream
{ {
'completed': (bool) true if download is completed, 'completed': (bool) true if download is completed,
'file_name': (str) name of file, 'file_name': (str) name of file,
'download_directory': (str) download directory, 'download_directory': (str) download directory,
'points_paid': (float) credit paid to download file, 'points_paid': (float) credit paid to download file,
'stopped': (bool) true if download is stopped, 'stopped': (bool) true if download is stopped,
'stream_hash': (str) stream hash of file, 'stream_hash': (str) stream hash of file,
'stream_name': (str) stream name, 'stream_name': (str) stream name ,
'suggested_file_name': (str) suggested file name, 'suggested_file_name': (str) suggested file name,
'sd_hash': (str) sd hash of file, 'sd_hash': (str) sd hash of file,
'name': (str) name claim attached to file 'download_path': (str) download path of file,
'outpoint': (str) claim outpoint attached to file 'mime_type': (str) mime type of file,
'claim_id': (str) claim ID attached to file, 'key': (str) key attached to file,
'download_path': (str) download path of file, 'total_bytes': (int) file size in bytes, None if full_status is false,
'mime_type': (str) mime type of file, 'written_bytes': (int) written size in bytes,
'key': (str) key attached to file, 'message': (str) status message,
'total_bytes': (int) file size in bytes, None if full_status is false 'outpoint': (str) claim outpoint
'written_bytes': (int) written size in bytes
'message': (str), None if full_status is false
'metadata': (dict) Metadata dictionary
} }
""" """
@ -1536,8 +1537,7 @@ class Daemon(AuthJSONRPCServer):
"Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", ""))) "Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", "")))
else: else:
resolved = resolved['claim'] resolved = resolved['claim']
txid, nout, name = resolved['txid'], resolved['nout'], resolved['name']
name = resolved['name']
claim_dict = ClaimDict.load_dict(resolved['value']) claim_dict = ClaimDict.load_dict(resolved['value'])
sd_hash = claim_dict.source_hash sd_hash = claim_dict.source_hash
@ -1556,8 +1556,8 @@ class Daemon(AuthJSONRPCServer):
log.info('Already have a file for %s', name) log.info('Already have a file for %s', name)
result = yield self._get_lbry_file_dict(lbry_file, full_status=True) result = yield self._get_lbry_file_dict(lbry_file, full_status=True)
else: else:
result = yield self._download_name(name, claim_dict, sd_hash, timeout=timeout, result = yield self._download_name(name, claim_dict, sd_hash, txid, nout,
file_name=file_name) timeout=timeout, file_name=file_name)
response = yield self._render_response(result) response = yield self._render_response(result)
defer.returnValue(response) defer.returnValue(response)
@ -1669,7 +1669,7 @@ class Daemon(AuthJSONRPCServer):
Returns: Returns:
(float) Estimated cost in lbry credits, returns None if uri is not (float) Estimated cost in lbry credits, returns None if uri is not
resolveable resolvable
""" """
cost = yield self.get_est_cost(uri, size) cost = yield self.get_est_cost(uri, size)
defer.returnValue(cost) defer.returnValue(cost)

View file

@ -224,7 +224,8 @@ class AuthJSONRPCServer(AuthorizedBase):
else: else:
# last resort, just cast it as a string # last resort, just cast it as a string
error = JSONRPCError(str(failure)) error = JSONRPCError(str(failure))
log.warning("error processing api request: %s", error.message) log.warning("error processing api request: %s\ntraceback: %s", error.message,
"\n".join(error.traceback))
response_content = jsonrpc_dumps_pretty(error, id=id_) response_content = jsonrpc_dumps_pretty(error, id=id_)
self._set_headers(request, response_content) self._set_headers(request, response_content)
request.setResponseCode(200) request.setResponseCode(200)

View file

@ -13,6 +13,9 @@ def migrate_db(db_dir, start, end):
elif current == 3: elif current == 3:
from lbrynet.db_migrator.migrate3to4 import do_migration from lbrynet.db_migrator.migrate3to4 import do_migration
do_migration(db_dir) do_migration(db_dir)
elif current == 4:
from lbrynet.db_migrator.migrate4to5 import do_migration
do_migration(db_dir)
else: else:
raise Exception( raise Exception(
"DB migration of version {} to {} is not available".format(current, current+1)) "DB migration of version {} to {} is not available".format(current, current+1))

View file

@ -81,5 +81,6 @@ def migrate_blobs_db(db_dir):
log.error("Some how not all blobs were marked as announceable") log.error("Some how not all blobs were marked as announceable")
blobs_db_file.commit() blobs_db_file.commit()
blobs_db_file.close()
lbryfile_info_file.close()

View file

@ -0,0 +1,63 @@
import sqlite3
import os
import logging
log = logging.getLogger(__name__)
def do_migration(db_dir):
log.info("Doing the migration")
add_lbry_file_metadata(db_dir)
log.info("Migration succeeded")
def add_lbry_file_metadata(db_dir):
"""
We migrate the blobs.db used in BlobManager to have a "should_announce" column,
and set this to True for blobs that are sd_hash's or head blobs (first blob in stream)
"""
name_metadata = os.path.join(db_dir, "blockchainname.db")
lbryfile_info_db = os.path.join(db_dir, 'lbryfile_info.db')
if not os.path.isfile(name_metadata) and not os.path.isfile(lbryfile_info_db):
return
if not os.path.isfile(lbryfile_info_db):
log.error(
"blockchainname.db was not found but lbryfile_info.db was found, skipping migration")
return
name_metadata_db = sqlite3.connect(name_metadata)
lbryfile_db = sqlite3.connect(lbryfile_info_db)
name_metadata_cursor = name_metadata_db.cursor()
lbryfile_cursor = lbryfile_db.cursor()
lbryfile_db.executescript(
"create table if not exists lbry_file_metadata (" +
" lbry_file integer primary key, " +
" txid text, " +
" n integer, " +
" foreign key(lbry_file) references lbry_files(rowid)"
")")
_files = lbryfile_cursor.execute("select rowid, stream_hash from lbry_files").fetchall()
lbry_files = {x[1]: x[0] for x in _files}
for (sd_hash, stream_hash) in lbryfile_cursor.execute("select * "
"from lbry_file_descriptors").fetchall():
lbry_file_id = lbry_files[stream_hash]
outpoint = name_metadata_cursor.execute("select txid, n from name_metadata "
"where sd_hash=?",
(sd_hash,)).fetchall()
if outpoint:
txid, nout = outpoint[0]
lbryfile_cursor.execute("insert into lbry_file_metadata values (?, ?, ?)",
(lbry_file_id, txid, nout))
else:
lbryfile_cursor.execute("insert into lbry_file_metadata values (?, ?, ?)",
(lbry_file_id, None, None))
lbryfile_db.commit()
lbryfile_db.close()
name_metadata_db.close()

View file

@ -5,7 +5,6 @@ Keep track of which LBRY Files are downloading and store their LBRY File specifi
import logging import logging
import os import os
from twisted.enterprise import adbapi
from twisted.internet import defer, task, reactor from twisted.internet import defer, task, reactor
from twisted.python.failure import Failure from twisted.python.failure import Failure
@ -16,7 +15,6 @@ from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDow
from lbrynet.lbry_file.StreamDescriptor import EncryptedFileStreamType from lbrynet.lbry_file.StreamDescriptor import EncryptedFileStreamType
from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError
from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError
from lbrynet.core.sqlite_helpers import rerun_if_locked
from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call
from lbrynet import conf from lbrynet import conf
@ -41,7 +39,6 @@ class EncryptedFileManager(object):
# TODO: why is sd_identifier part of the file manager? # TODO: why is sd_identifier part of the file manager?
self.sd_identifier = sd_identifier self.sd_identifier = sd_identifier
self.lbry_files = [] self.lbry_files = []
self.sql_db = None
if download_directory: if download_directory:
self.download_directory = download_directory self.download_directory = download_directory
else: else:
@ -51,7 +48,7 @@ class EncryptedFileManager(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def setup(self): def setup(self):
yield self._open_db() yield self.stream_info_manager.setup()
yield self._add_to_sd_identifier() yield self._add_to_sd_identifier()
# don't block on starting the lbry files # don't block on starting the lbry files
self._start_lbry_files() self._start_lbry_files()
@ -252,84 +249,32 @@ class EncryptedFileManager(object):
def stop(self): def stop(self):
safe_stop_looping_call(self.lbry_file_reflector) safe_stop_looping_call(self.lbry_file_reflector)
yield defer.DeferredList(list(self._stop_lbry_files())) yield defer.DeferredList(list(self._stop_lbry_files()))
if self.sql_db:
yield self.sql_db.close()
self.sql_db = None
log.info("Stopped encrypted file manager") log.info("Stopped encrypted file manager")
defer.returnValue(True) defer.returnValue(True)
def get_count_for_stream_hash(self, stream_hash): def get_count_for_stream_hash(self, stream_hash):
return self._get_count_for_stream_hash(stream_hash) return self._get_count_for_stream_hash(stream_hash)
######### database calls #########
def _open_db(self):
# check_same_thread=False is solely to quiet a spurious error that appears to be due
# to a bug in twisted, where the connection is closed by a different thread than the
# one that opened it. The individual connections in the pool are not used in multiple
# threads.
self.sql_db = adbapi.ConnectionPool(
"sqlite3",
os.path.join(self.session.db_dir, "lbryfile_info.db"),
check_same_thread=False
)
return self.sql_db.runQuery(
"create table if not exists lbry_file_options (" +
" blob_data_rate real, " +
" status text," +
" stream_hash text,"
" foreign key(stream_hash) references lbry_files(stream_hash)" +
")"
)
@rerun_if_locked
def _save_lbry_file(self, stream_hash, data_payment_rate):
def do_save(db_transaction):
row = (data_payment_rate, ManagedEncryptedFileDownloader.STATUS_STOPPED, stream_hash)
db_transaction.execute("insert into lbry_file_options values (?, ?, ?)", row)
return db_transaction.lastrowid
return self.sql_db.runInteraction(do_save)
@rerun_if_locked
def _delete_lbry_file_options(self, rowid):
return self.sql_db.runQuery("delete from lbry_file_options where rowid = ?",
(rowid,))
@rerun_if_locked
def _set_lbry_file_payment_rate(self, rowid, new_rate):
return self.sql_db.runQuery(
"update lbry_file_options set blob_data_rate = ? where rowid = ?",
(new_rate, rowid))
@rerun_if_locked
def _get_all_lbry_files(self):
d = self.sql_db.runQuery("select rowid, stream_hash, blob_data_rate from lbry_file_options")
return d
@rerun_if_locked
def _change_file_status(self, rowid, new_status):
d = self.sql_db.runQuery("update lbry_file_options set status = ? where rowid = ?",
(new_status, rowid))
d.addCallback(lambda _: new_status)
return d
@rerun_if_locked
def _get_lbry_file_status(self, rowid):
d = self.sql_db.runQuery("select status from lbry_file_options where rowid = ?",
(rowid,))
d.addCallback(lambda r: (r[0][0] if len(r) else None))
return d
@rerun_if_locked
def _get_count_for_stream_hash(self, stream_hash): def _get_count_for_stream_hash(self, stream_hash):
d = self.sql_db.runQuery("select count(*) from lbry_file_options where stream_hash = ?", return self.stream_info_manager._get_count_for_stream_hash(stream_hash)
(stream_hash,))
d.addCallback(lambda r: (r[0][0] if r else 0)) def _delete_lbry_file_options(self, rowid):
return d return self.stream_info_manager._delete_lbry_file_options(rowid)
def _save_lbry_file(self, stream_hash, data_payment_rate):
return self.stream_info_manager._save_lbry_file(stream_hash, data_payment_rate)
def _get_all_lbry_files(self):
return self.stream_info_manager._get_all_lbry_files()
@rerun_if_locked
def _get_rowid_for_stream_hash(self, stream_hash): def _get_rowid_for_stream_hash(self, stream_hash):
d = self.sql_db.runQuery("select rowid from lbry_file_options where stream_hash = ?", return self.stream_info_manager._get_rowid_for_stream_hash(stream_hash)
(stream_hash,))
d.addCallback(lambda r: (r[0][0] if len(r) else None)) def _change_file_status(self, rowid, status):
return d return self.stream_info_manager._change_file_status(rowid, status)
def _set_lbry_file_payment_rate(self, rowid, new_rate):
return self.stream_info_manager._set_lbry_file_payment_rate(rowid, new_rate)
def _get_lbry_file_status(self, rowid):
return self.stream_info_manager._get_lbry_file_status(rowid)

View file

@ -6,7 +6,7 @@ from twisted.python.failure import Failure
from twisted.enterprise import adbapi from twisted.enterprise import adbapi
from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHash, NoSuchSDHash from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHash, NoSuchSDHash
from lbrynet.core.sqlite_helpers import rerun_if_locked from lbrynet.core.sqlite_helpers import rerun_if_locked
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -14,11 +14,12 @@ log = logging.getLogger(__name__)
class DBEncryptedFileMetadataManager(object): class DBEncryptedFileMetadataManager(object):
"""Store and provide access to LBRY file metadata using sqlite""" """Store and provide access to LBRY file metadata using sqlite"""
def __init__(self, db_dir): def __init__(self, db_dir, file_name=None):
self.db_dir = db_dir self.db_dir = db_dir
self.stream_info_db = None self._db_file_name = file_name or "lbryfile_info.db"
self.stream_blob_db = None self.db_conn = adbapi.ConnectionPool("sqlite3", os.path.join(self.db_dir,
self.stream_desc_db = None self._db_file_name),
check_same_thread=False)
def setup(self): def setup(self):
return self._open_db() return self._open_db()
@ -96,52 +97,81 @@ class DBEncryptedFileMetadataManager(object):
def get_stream_hash_for_sd_hash(self, sd_hash): def get_stream_hash_for_sd_hash(self, sd_hash):
return self._get_stream_hash_for_sd_blob_hash(sd_hash) return self._get_stream_hash_for_sd_blob_hash(sd_hash)
@staticmethod
def _create_tables(transaction):
transaction.execute("create table if not exists lbry_files (" +
" stream_hash text primary key, " +
" key text, " +
" stream_name text, " +
" suggested_file_name text" +
")")
transaction.execute("create table if not exists lbry_file_blobs (" +
" blob_hash text, " +
" stream_hash text, " +
" position integer, " +
" iv text, " +
" length integer, " +
" foreign key(stream_hash) references lbry_files(stream_hash)" +
")")
transaction.execute("create table if not exists lbry_file_descriptors (" +
" sd_blob_hash TEXT PRIMARY KEY, " +
" stream_hash TEXT, " +
" foreign key(stream_hash) references lbry_files(stream_hash)" +
")")
transaction.execute("create table if not exists lbry_file_options (" +
" blob_data_rate real, " +
" status text," +
" stream_hash text,"
" foreign key(stream_hash) references lbry_files(stream_hash)" +
")")
transaction.execute("create table if not exists lbry_file_metadata (" +
" lbry_file integer primary key, " +
" txid text, " +
" n integer, " +
" foreign key(lbry_file) references lbry_files(rowid)"
")")
def _open_db(self): def _open_db(self):
# check_same_thread=False is solely to quiet a spurious error that appears to be due # check_same_thread=False is solely to quiet a spurious error that appears to be due
# to a bug in twisted, where the connection is closed by a different thread than the # to a bug in twisted, where the connection is closed by a different thread than the
# one that opened it. The individual connections in the pool are not used in multiple # one that opened it. The individual connections in the pool are not used in multiple
# threads. # threads.
self.db_conn = adbapi.ConnectionPool( return self.db_conn.runInteraction(self._create_tables)
"sqlite3",
(os.path.join(self.db_dir, "lbryfile_info.db")),
check_same_thread=False)
def create_tables(transaction): @rerun_if_locked
transaction.execute("create table if not exists lbry_files (" + @defer.inlineCallbacks
" stream_hash text primary key, " + def get_file_outpoint(self, rowid):
" key text, " + result = yield self.db_conn.runQuery("select txid, n from lbry_file_metadata "
" stream_name text, " + "where lbry_file=?", (rowid, ))
" suggested_file_name text" + response = None
")") if result:
transaction.execute("create table if not exists lbry_file_blobs (" + txid, nout = result[0]
" blob_hash text, " + if txid is not None and nout is not None:
" stream_hash text, " + response = "%s:%i" % (txid, nout)
" position integer, " + defer.returnValue(response)
" iv text, " +
" length integer, " +
" foreign key(stream_hash) references lbry_files(stream_hash)" +
")")
transaction.execute("create table if not exists lbry_file_descriptors (" +
" sd_blob_hash TEXT PRIMARY KEY, " +
" stream_hash TEXT, " +
" foreign key(stream_hash) references lbry_files(stream_hash)" +
")")
return self.db_conn.runInteraction(create_tables) @rerun_if_locked
@defer.inlineCallbacks
def save_outpoint_to_file(self, rowid, txid, nout):
existing_outpoint = yield self.get_file_outpoint(rowid)
if not existing_outpoint:
yield self.db_conn.runOperation("insert into lbry_file_metadata values "
"(?, ?, ?)", (rowid, txid, nout))
@rerun_if_locked @rerun_if_locked
def _delete_stream(self, stream_hash): def _delete_stream(self, stream_hash):
d = self.db_conn.runQuery( d = self.db_conn.runQuery(
"select stream_hash from lbry_files where stream_hash = ?", (stream_hash,)) "select rowid, stream_hash from lbry_files where stream_hash = ?", (stream_hash,))
d.addCallback( d.addCallback(
lambda result: result[0][0] if result else Failure(NoSuchStreamHash(stream_hash))) lambda result: result[0] if result else Failure(NoSuchStreamHash(stream_hash)))
def do_delete(transaction, s_h): def do_delete(transaction, row_id, s_h):
transaction.execute("delete from lbry_files where stream_hash = ?", (s_h,)) transaction.execute("delete from lbry_files where stream_hash = ?", (s_h,))
transaction.execute("delete from lbry_file_blobs where stream_hash = ?", (s_h,)) transaction.execute("delete from lbry_file_blobs where stream_hash = ?", (s_h,))
transaction.execute("delete from lbry_file_descriptors where stream_hash = ?", (s_h,)) transaction.execute("delete from lbry_file_descriptors where stream_hash = ?", (s_h,))
transaction.execute("delete from lbry_file_metadata where lbry_file = ?", (row_id,))
d.addCallback(lambda s_h: self.db_conn.runInteraction(do_delete, s_h)) d.addCallback(lambda (row_id, s_h): self.db_conn.runInteraction(do_delete, row_id, s_h))
return d return d
@rerun_if_locked @rerun_if_locked
@ -269,97 +299,56 @@ class DBEncryptedFileMetadataManager(object):
d.addCallback(_handle_result) d.addCallback(_handle_result)
return d return d
# used by lbry file manager
@rerun_if_locked
def _save_lbry_file(self, stream_hash, data_payment_rate):
def do_save(db_transaction):
row = (data_payment_rate, ManagedEncryptedFileDownloader.STATUS_STOPPED, stream_hash)
db_transaction.execute("insert into lbry_file_options values (?, ?, ?)", row)
return db_transaction.lastrowid
return self.db_conn.runInteraction(do_save)
class TempEncryptedFileMetadataManager(object): @rerun_if_locked
def __init__(self): def _delete_lbry_file_options(self, rowid):
self.streams = {} return self.db_conn.runQuery("delete from lbry_file_options where rowid = ?",
self.stream_blobs = {} (rowid,))
self.sd_files = {}
def setup(self): @rerun_if_locked
return defer.succeed(True) def _set_lbry_file_payment_rate(self, rowid, new_rate):
return self.db_conn.runQuery(
"update lbry_file_options set blob_data_rate = ? where rowid = ?",
(new_rate, rowid))
def stop(self): @rerun_if_locked
return defer.succeed(True) def _get_all_lbry_files(self):
d = self.db_conn.runQuery("select rowid, stream_hash, "
def get_all_streams(self): "blob_data_rate from lbry_file_options")
return defer.succeed(self.streams.keys())
def save_stream(self, stream_hash, file_name, key, suggested_file_name, blobs):
self.streams[stream_hash] = {'suggested_file_name': suggested_file_name,
'stream_name': file_name,
'key': key}
d = self.add_blobs_to_stream(stream_hash, blobs)
d.addCallback(lambda _: stream_hash)
return d return d
def get_stream_info(self, stream_hash): @rerun_if_locked
if stream_hash in self.streams: def _change_file_status(self, rowid, new_status):
stream_info = self.streams[stream_hash] d = self.db_conn.runQuery("update lbry_file_options set status = ? where rowid = ?",
return defer.succeed([stream_info['key'], stream_info['stream_name'], (new_status, rowid))
stream_info['suggested_file_name']]) d.addCallback(lambda _: new_status)
return defer.succeed(None) return d
def delete_stream(self, stream_hash): @rerun_if_locked
if stream_hash in self.streams: def _get_lbry_file_status(self, rowid):
del self.streams[stream_hash] d = self.db_conn.runQuery("select status from lbry_file_options where rowid = ?",
for (s_h, b_h) in self.stream_blobs.keys(): (rowid,))
if s_h == stream_hash: d.addCallback(lambda r: (r[0][0] if len(r) else None))
del self.stream_blobs[(s_h, b_h)] return d
return defer.succeed(True)
def add_blobs_to_stream(self, stream_hash, blobs): @rerun_if_locked
assert stream_hash in self.streams, "Can't add blobs to a stream that isn't known" def _get_count_for_stream_hash(self, stream_hash):
for blob in blobs: d = self.db_conn.runQuery("select count(*) from lbry_file_options where stream_hash = ?",
info = {} (stream_hash,))
info['blob_num'] = blob.blob_num d.addCallback(lambda r: (r[0][0] if r else 0))
info['length'] = blob.length return d
info['iv'] = blob.iv
self.stream_blobs[(stream_hash, blob.blob_hash)] = info
return defer.succeed(True)
def get_blobs_for_stream(self, stream_hash, start_blob=None, @rerun_if_locked
end_blob=None, count=None, reverse=False): def _get_rowid_for_stream_hash(self, stream_hash):
d = self.db_conn.runQuery("select rowid from lbry_file_options where stream_hash = ?",
if start_blob is not None: (stream_hash,))
start_num = self._get_blob_num_by_hash(stream_hash, start_blob) d.addCallback(lambda r: (r[0][0] if len(r) else None))
else: return d
start_num = None
if end_blob is not None:
end_num = self._get_blob_num_by_hash(stream_hash, end_blob)
else:
end_num = None
return self._get_further_blob_infos(stream_hash, start_num, end_num, count, reverse)
def get_stream_of_blob(self, blob_hash):
for (s_h, b_h) in self.stream_blobs.iterkeys():
if b_h == blob_hash:
return defer.succeed(s_h)
return defer.succeed(None)
def _get_further_blob_infos(self, stream_hash, start_num, end_num, count=None, reverse=False):
blob_infos = []
for (s_h, b_h), info in self.stream_blobs.iteritems():
if stream_hash == s_h:
position = info['blob_num']
length = info['length']
iv = info['iv']
if (start_num is None) or (position > start_num):
if (end_num is None) or (position < end_num):
blob_infos.append((b_h, position, iv, length))
blob_infos.sort(key=lambda i: i[1], reverse=reverse)
if count is not None:
blob_infos = blob_infos[:count]
return defer.succeed(blob_infos)
def _get_blob_num_by_hash(self, stream_hash, blob_hash):
if (stream_hash, blob_hash) in self.stream_blobs:
return defer.succeed(self.stream_blobs[(stream_hash, blob_hash)]['blob_num'])
def save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash):
self.sd_files[sd_blob_hash] = stream_hash
return defer.succeed(True)
def get_sd_blob_hashes_for_stream(self, stream_hash):
return defer.succeed(
[sd_hash for sd_hash, s_h in self.sd_files.iteritems() if stream_hash == s_h])

View file

@ -10,7 +10,6 @@ import unittest
from Crypto import Random from Crypto import Random
from Crypto.Hash import MD5 from Crypto.Hash import MD5
from lbrynet import conf from lbrynet import conf
from lbrynet.lbry_file.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager
from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager
from lbrynet.core.Session import Session from lbrynet.core.Session import Session
@ -120,7 +119,7 @@ class LbryUploader(object):
peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker, blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1") dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1")
stream_info_manager = TempEncryptedFileMetadataManager() stream_info_manager = DBEncryptedFileMetadataManager(self.db_dir)
self.lbry_file_manager = EncryptedFileManager( self.lbry_file_manager = EncryptedFileManager(
self.session, stream_info_manager, self.sd_identifier) self.session, stream_info_manager, self.sd_identifier)
if self.ul_rate_limit is not None: if self.ul_rate_limit is not None:
@ -227,7 +226,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event,
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
external_ip="127.0.0.1") external_ip="127.0.0.1")
stream_info_manager = TempEncryptedFileMetadataManager() stream_info_manager = DBEncryptedFileMetadataManager(db_dir)
lbry_file_manager = EncryptedFileManager(session, stream_info_manager, sd_identifier) lbry_file_manager = EncryptedFileManager(session, stream_info_manager, sd_identifier)
@ -520,7 +519,7 @@ class TestTransfer(TestCase):
blob_tracker_class=DummyBlobAvailabilityTracker, blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1") dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1")
self.stream_info_manager = TempEncryptedFileMetadataManager() self.stream_info_manager = DBEncryptedFileMetadataManager(db_dir)
self.lbry_file_manager = EncryptedFileManager( self.lbry_file_manager = EncryptedFileManager(
self.session, self.stream_info_manager, sd_identifier) self.session, self.stream_info_manager, sd_identifier)
@ -820,7 +819,7 @@ class TestTransfer(TestCase):
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
external_ip="127.0.0.1") external_ip="127.0.0.1")
self.stream_info_manager = TempEncryptedFileMetadataManager() self.stream_info_manager = DBEncryptedFileMetadataManager(db_dir)
self.lbry_file_manager = EncryptedFileManager( self.lbry_file_manager = EncryptedFileManager(
self.session, self.stream_info_manager, sd_identifier) self.session, self.stream_info_manager, sd_identifier)

View file

@ -7,7 +7,6 @@ from twisted.trial.unittest import TestCase
from twisted.internet import defer, threads from twisted.internet import defer, threads
from lbrynet import conf from lbrynet import conf
from lbrynet.lbry_file.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager
from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager
from lbrynet.core.Session import Session from lbrynet.core.Session import Session
@ -80,7 +79,7 @@ class TestStreamify(TestCase):
is_generous=self.is_generous, external_ip="127.0.0.1" is_generous=self.is_generous, external_ip="127.0.0.1"
) )
self.stream_info_manager = TempEncryptedFileMetadataManager() self.stream_info_manager = DBEncryptedFileMetadataManager(db_dir)
self.lbry_file_manager = EncryptedFileManager( self.lbry_file_manager = EncryptedFileManager(
self.session, self.stream_info_manager, sd_identifier) self.session, self.stream_info_manager, sd_identifier)

View file

@ -7,6 +7,7 @@ from lbrynet.cryptstream.CryptBlob import CryptBlobInfo
from lbrynet.core.Error import NoSuchStreamHash from lbrynet.core.Error import NoSuchStreamHash
from lbrynet.tests.util import random_lbry_hash from lbrynet.tests.util import random_lbry_hash
class DBEncryptedFileMetadataManagerTest(unittest.TestCase): class DBEncryptedFileMetadataManagerTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.db_dir = tempfile.mkdtemp() self.db_dir = tempfile.mkdtemp()

View file

@ -3,8 +3,10 @@ from twisted.trial import unittest
from lbrynet import conf from lbrynet import conf
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager
from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
from lbrynet.tests.util import random_lbry_hash from lbrynet.tests.util import random_lbry_hash
class TestEncryptedFileManager(unittest.TestCase): class TestEncryptedFileManager(unittest.TestCase):
def setUp(self): def setUp(self):
@ -19,12 +21,12 @@ class TestEncryptedFileManager(unittest.TestCase):
session = MocSession() session = MocSession()
session.db_dir = '.' session.db_dir = '.'
stream_info_manager = None stream_info_manager = DBEncryptedFileMetadataManager('.')
sd_identifier = None sd_identifier = None
download_directory = '.' download_directory = '.'
manager = EncryptedFileManager( manager = EncryptedFileManager(
session, stream_info_manager, sd_identifier, download_directory) session, stream_info_manager, sd_identifier, download_directory)
yield manager._open_db() yield manager.stream_info_manager.setup()
out = yield manager._get_all_lbry_files() out = yield manager._get_all_lbry_files()
self.assertEqual(len(out), 0) self.assertEqual(len(out), 0)