move db functions in EncryptedFileManger to EncryptedFileMetadataManager

- remove TempEncryptedFileMetadataManager, run tests with the normal
DBEncryptedFileMetadataManager
This commit is contained in:
Jack Robison 2017-12-06 16:49:49 -05:00
parent f9dee51ca4
commit 1594c6a831
No known key found for this signature in database
GPG key ID: 284699E7404E3CFF
7 changed files with 110 additions and 204 deletions

View file

@ -539,7 +539,6 @@ class Daemon(AuthJSONRPCServer):
log.info('Starting to setup up file manager') log.info('Starting to setup up file manager')
self.startup_status = STARTUP_STAGES[3] self.startup_status = STARTUP_STAGES[3]
self.stream_info_manager = DBEncryptedFileMetadataManager(self.db_dir) self.stream_info_manager = DBEncryptedFileMetadataManager(self.db_dir)
yield self.stream_info_manager.setup()
self.lbry_file_manager = EncryptedFileManager( self.lbry_file_manager = EncryptedFileManager(
self.session, self.session,
self.stream_info_manager, self.stream_info_manager,

View file

@ -5,7 +5,6 @@ Keep track of which LBRY Files are downloading and store their LBRY File specifi
import logging import logging
import os import os
from twisted.enterprise import adbapi
from twisted.internet import defer, task, reactor from twisted.internet import defer, task, reactor
from twisted.python.failure import Failure from twisted.python.failure import Failure
@ -16,7 +15,6 @@ from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDow
from lbrynet.lbry_file.StreamDescriptor import EncryptedFileStreamType from lbrynet.lbry_file.StreamDescriptor import EncryptedFileStreamType
from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError
from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError
from lbrynet.core.sqlite_helpers import rerun_if_locked
from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call
from lbrynet import conf from lbrynet import conf
@ -41,7 +39,6 @@ class EncryptedFileManager(object):
# TODO: why is sd_identifier part of the file manager? # TODO: why is sd_identifier part of the file manager?
self.sd_identifier = sd_identifier self.sd_identifier = sd_identifier
self.lbry_files = [] self.lbry_files = []
self.sql_db = None
if download_directory: if download_directory:
self.download_directory = download_directory self.download_directory = download_directory
else: else:
@ -51,7 +48,7 @@ class EncryptedFileManager(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def setup(self): def setup(self):
yield self._open_db() yield self.stream_info_manager.setup()
yield self._add_to_sd_identifier() yield self._add_to_sd_identifier()
# don't block on starting the lbry files # don't block on starting the lbry files
self._start_lbry_files() self._start_lbry_files()
@ -252,84 +249,32 @@ class EncryptedFileManager(object):
def stop(self): def stop(self):
safe_stop_looping_call(self.lbry_file_reflector) safe_stop_looping_call(self.lbry_file_reflector)
yield defer.DeferredList(list(self._stop_lbry_files())) yield defer.DeferredList(list(self._stop_lbry_files()))
if self.sql_db:
yield self.sql_db.close()
self.sql_db = None
log.info("Stopped encrypted file manager") log.info("Stopped encrypted file manager")
defer.returnValue(True) defer.returnValue(True)
def get_count_for_stream_hash(self, stream_hash): def get_count_for_stream_hash(self, stream_hash):
return self._get_count_for_stream_hash(stream_hash) return self._get_count_for_stream_hash(stream_hash)
######### database calls #########
def _open_db(self):
# check_same_thread=False is solely to quiet a spurious error that appears to be due
# to a bug in twisted, where the connection is closed by a different thread than the
# one that opened it. The individual connections in the pool are not used in multiple
# threads.
self.sql_db = adbapi.ConnectionPool(
"sqlite3",
os.path.join(self.session.db_dir, "lbryfile_info.db"),
check_same_thread=False
)
return self.sql_db.runQuery(
"create table if not exists lbry_file_options (" +
" blob_data_rate real, " +
" status text," +
" stream_hash text,"
" foreign key(stream_hash) references lbry_files(stream_hash)" +
")"
)
@rerun_if_locked
def _save_lbry_file(self, stream_hash, data_payment_rate):
def do_save(db_transaction):
row = (data_payment_rate, ManagedEncryptedFileDownloader.STATUS_STOPPED, stream_hash)
db_transaction.execute("insert into lbry_file_options values (?, ?, ?)", row)
return db_transaction.lastrowid
return self.sql_db.runInteraction(do_save)
@rerun_if_locked
def _delete_lbry_file_options(self, rowid):
return self.sql_db.runQuery("delete from lbry_file_options where rowid = ?",
(rowid,))
@rerun_if_locked
def _set_lbry_file_payment_rate(self, rowid, new_rate):
return self.sql_db.runQuery(
"update lbry_file_options set blob_data_rate = ? where rowid = ?",
(new_rate, rowid))
@rerun_if_locked
def _get_all_lbry_files(self):
d = self.sql_db.runQuery("select rowid, stream_hash, blob_data_rate from lbry_file_options")
return d
@rerun_if_locked
def _change_file_status(self, rowid, new_status):
d = self.sql_db.runQuery("update lbry_file_options set status = ? where rowid = ?",
(new_status, rowid))
d.addCallback(lambda _: new_status)
return d
@rerun_if_locked
def _get_lbry_file_status(self, rowid):
d = self.sql_db.runQuery("select status from lbry_file_options where rowid = ?",
(rowid,))
d.addCallback(lambda r: (r[0][0] if len(r) else None))
return d
@rerun_if_locked
def _get_count_for_stream_hash(self, stream_hash): def _get_count_for_stream_hash(self, stream_hash):
d = self.sql_db.runQuery("select count(*) from lbry_file_options where stream_hash = ?", return self.stream_info_manager._get_count_for_stream_hash(stream_hash)
(stream_hash,))
d.addCallback(lambda r: (r[0][0] if r else 0)) def _delete_lbry_file_options(self, rowid):
return d return self.stream_info_manager._delete_lbry_file_options(rowid)
def _save_lbry_file(self, stream_hash, data_payment_rate):
return self.stream_info_manager._save_lbry_file(stream_hash, data_payment_rate)
def _get_all_lbry_files(self):
return self.stream_info_manager._get_all_lbry_files()
@rerun_if_locked
def _get_rowid_for_stream_hash(self, stream_hash): def _get_rowid_for_stream_hash(self, stream_hash):
d = self.sql_db.runQuery("select rowid from lbry_file_options where stream_hash = ?", return self.stream_info_manager._get_rowid_for_stream_hash(stream_hash)
(stream_hash,))
d.addCallback(lambda r: (r[0][0] if len(r) else None)) def _change_file_status(self, rowid, status):
return d return self.stream_info_manager._change_file_status(rowid, status)
def _set_lbry_file_payment_rate(self, rowid, new_rate):
return self.stream_info_manager._set_lbry_file_payment_rate(rowid, new_rate)
def _get_lbry_file_status(self, rowid):
return self.stream_info_manager._get_lbry_file_status(rowid)

View file

@ -6,7 +6,7 @@ from twisted.python.failure import Failure
from twisted.enterprise import adbapi from twisted.enterprise import adbapi
from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHash, NoSuchSDHash from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHash, NoSuchSDHash
from lbrynet.core.sqlite_helpers import rerun_if_locked from lbrynet.core.sqlite_helpers import rerun_if_locked
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -14,11 +14,12 @@ log = logging.getLogger(__name__)
class DBEncryptedFileMetadataManager(object): class DBEncryptedFileMetadataManager(object):
"""Store and provide access to LBRY file metadata using sqlite""" """Store and provide access to LBRY file metadata using sqlite"""
def __init__(self, db_dir): def __init__(self, db_dir, file_name=None):
self.db_dir = db_dir self.db_dir = db_dir
self.stream_info_db = None self._db_file_name = file_name or "lbryfile_info.db"
self.stream_blob_db = None self.db_conn = adbapi.ConnectionPool("sqlite3", os.path.join(self.db_dir,
self.stream_desc_db = None self._db_file_name),
check_same_thread=False)
def setup(self): def setup(self):
return self._open_db() return self._open_db()
@ -96,38 +97,40 @@ class DBEncryptedFileMetadataManager(object):
def get_stream_hash_for_sd_hash(self, sd_hash): def get_stream_hash_for_sd_hash(self, sd_hash):
return self._get_stream_hash_for_sd_blob_hash(sd_hash) return self._get_stream_hash_for_sd_blob_hash(sd_hash)
@staticmethod
def _create_tables(transaction):
transaction.execute("create table if not exists lbry_files (" +
" stream_hash text primary key, " +
" key text, " +
" stream_name text, " +
" suggested_file_name text" +
")")
transaction.execute("create table if not exists lbry_file_blobs (" +
" blob_hash text, " +
" stream_hash text, " +
" position integer, " +
" iv text, " +
" length integer, " +
" foreign key(stream_hash) references lbry_files(stream_hash)" +
")")
transaction.execute("create table if not exists lbry_file_descriptors (" +
" sd_blob_hash TEXT PRIMARY KEY, " +
" stream_hash TEXT, " +
" foreign key(stream_hash) references lbry_files(stream_hash)" +
")")
transaction.execute("create table if not exists lbry_file_options (" +
" blob_data_rate real, " +
" status text," +
" stream_hash text,"
" foreign key(stream_hash) references lbry_files(stream_hash)" +
")")
def _open_db(self): def _open_db(self):
# check_same_thread=False is solely to quiet a spurious error that appears to be due # check_same_thread=False is solely to quiet a spurious error that appears to be due
# to a bug in twisted, where the connection is closed by a different thread than the # to a bug in twisted, where the connection is closed by a different thread than the
# one that opened it. The individual connections in the pool are not used in multiple # one that opened it. The individual connections in the pool are not used in multiple
# threads. # threads.
self.db_conn = adbapi.ConnectionPool( return self.db_conn.runInteraction(self._create_tables)
"sqlite3",
(os.path.join(self.db_dir, "lbryfile_info.db")),
check_same_thread=False)
def create_tables(transaction):
transaction.execute("create table if not exists lbry_files (" +
" stream_hash text primary key, " +
" key text, " +
" stream_name text, " +
" suggested_file_name text" +
")")
transaction.execute("create table if not exists lbry_file_blobs (" +
" blob_hash text, " +
" stream_hash text, " +
" position integer, " +
" iv text, " +
" length integer, " +
" foreign key(stream_hash) references lbry_files(stream_hash)" +
")")
transaction.execute("create table if not exists lbry_file_descriptors (" +
" sd_blob_hash TEXT PRIMARY KEY, " +
" stream_hash TEXT, " +
" foreign key(stream_hash) references lbry_files(stream_hash)" +
")")
return self.db_conn.runInteraction(create_tables)
@rerun_if_locked @rerun_if_locked
def _delete_stream(self, stream_hash): def _delete_stream(self, stream_hash):
@ -269,97 +272,55 @@ class DBEncryptedFileMetadataManager(object):
d.addCallback(_handle_result) d.addCallback(_handle_result)
return d return d
# used by lbry file manager
@rerun_if_locked
def _save_lbry_file(self, stream_hash, data_payment_rate):
def do_save(db_transaction):
row = (data_payment_rate, ManagedEncryptedFileDownloader.STATUS_STOPPED, stream_hash)
db_transaction.execute("insert into lbry_file_options values (?, ?, ?)", row)
return db_transaction.lastrowid
return self.db_conn.runInteraction(do_save)
class TempEncryptedFileMetadataManager(object): @rerun_if_locked
def __init__(self): def _delete_lbry_file_options(self, rowid):
self.streams = {} return self.db_conn.runQuery("delete from lbry_file_options where rowid = ?",
self.stream_blobs = {} (rowid,))
self.sd_files = {}
def setup(self): @rerun_if_locked
return defer.succeed(True) def _set_lbry_file_payment_rate(self, rowid, new_rate):
return self.db_conn.runQuery(
"update lbry_file_options set blob_data_rate = ? where rowid = ?",
(new_rate, rowid))
def stop(self): @rerun_if_locked
return defer.succeed(True) def _get_all_lbry_files(self):
d = self.db_conn.runQuery("select rowid, stream_hash, blob_data_rate from lbry_file_options")
def get_all_streams(self):
return defer.succeed(self.streams.keys())
def save_stream(self, stream_hash, file_name, key, suggested_file_name, blobs):
self.streams[stream_hash] = {'suggested_file_name': suggested_file_name,
'stream_name': file_name,
'key': key}
d = self.add_blobs_to_stream(stream_hash, blobs)
d.addCallback(lambda _: stream_hash)
return d return d
def get_stream_info(self, stream_hash): @rerun_if_locked
if stream_hash in self.streams: def _change_file_status(self, rowid, new_status):
stream_info = self.streams[stream_hash] d = self.db_conn.runQuery("update lbry_file_options set status = ? where rowid = ?",
return defer.succeed([stream_info['key'], stream_info['stream_name'], (new_status, rowid))
stream_info['suggested_file_name']]) d.addCallback(lambda _: new_status)
return defer.succeed(None) return d
def delete_stream(self, stream_hash): @rerun_if_locked
if stream_hash in self.streams: def _get_lbry_file_status(self, rowid):
del self.streams[stream_hash] d = self.db_conn.runQuery("select status from lbry_file_options where rowid = ?",
for (s_h, b_h) in self.stream_blobs.keys(): (rowid,))
if s_h == stream_hash: d.addCallback(lambda r: (r[0][0] if len(r) else None))
del self.stream_blobs[(s_h, b_h)] return d
return defer.succeed(True)
def add_blobs_to_stream(self, stream_hash, blobs): @rerun_if_locked
assert stream_hash in self.streams, "Can't add blobs to a stream that isn't known" def _get_count_for_stream_hash(self, stream_hash):
for blob in blobs: d = self.db_conn.runQuery("select count(*) from lbry_file_options where stream_hash = ?",
info = {} (stream_hash,))
info['blob_num'] = blob.blob_num d.addCallback(lambda r: (r[0][0] if r else 0))
info['length'] = blob.length return d
info['iv'] = blob.iv
self.stream_blobs[(stream_hash, blob.blob_hash)] = info
return defer.succeed(True)
def get_blobs_for_stream(self, stream_hash, start_blob=None, @rerun_if_locked
end_blob=None, count=None, reverse=False): def _get_rowid_for_stream_hash(self, stream_hash):
d = self.db_conn.runQuery("select rowid from lbry_file_options where stream_hash = ?",
if start_blob is not None: (stream_hash,))
start_num = self._get_blob_num_by_hash(stream_hash, start_blob) d.addCallback(lambda r: (r[0][0] if len(r) else None))
else: return d
start_num = None
if end_blob is not None:
end_num = self._get_blob_num_by_hash(stream_hash, end_blob)
else:
end_num = None
return self._get_further_blob_infos(stream_hash, start_num, end_num, count, reverse)
def get_stream_of_blob(self, blob_hash):
for (s_h, b_h) in self.stream_blobs.iterkeys():
if b_h == blob_hash:
return defer.succeed(s_h)
return defer.succeed(None)
def _get_further_blob_infos(self, stream_hash, start_num, end_num, count=None, reverse=False):
blob_infos = []
for (s_h, b_h), info in self.stream_blobs.iteritems():
if stream_hash == s_h:
position = info['blob_num']
length = info['length']
iv = info['iv']
if (start_num is None) or (position > start_num):
if (end_num is None) or (position < end_num):
blob_infos.append((b_h, position, iv, length))
blob_infos.sort(key=lambda i: i[1], reverse=reverse)
if count is not None:
blob_infos = blob_infos[:count]
return defer.succeed(blob_infos)
def _get_blob_num_by_hash(self, stream_hash, blob_hash):
if (stream_hash, blob_hash) in self.stream_blobs:
return defer.succeed(self.stream_blobs[(stream_hash, blob_hash)]['blob_num'])
def save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash):
self.sd_files[sd_blob_hash] = stream_hash
return defer.succeed(True)
def get_sd_blob_hashes_for_stream(self, stream_hash):
return defer.succeed(
[sd_hash for sd_hash, s_h in self.sd_files.iteritems() if stream_hash == s_h])

View file

@ -10,7 +10,6 @@ import unittest
from Crypto import Random from Crypto import Random
from Crypto.Hash import MD5 from Crypto.Hash import MD5
from lbrynet import conf from lbrynet import conf
from lbrynet.lbry_file.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager
from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager
from lbrynet.core.Session import Session from lbrynet.core.Session import Session
@ -120,7 +119,7 @@ class LbryUploader(object):
peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker, blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1") dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1")
stream_info_manager = TempEncryptedFileMetadataManager() stream_info_manager = DBEncryptedFileMetadataManager(self.db_dir)
self.lbry_file_manager = EncryptedFileManager( self.lbry_file_manager = EncryptedFileManager(
self.session, stream_info_manager, self.sd_identifier) self.session, stream_info_manager, self.sd_identifier)
if self.ul_rate_limit is not None: if self.ul_rate_limit is not None:
@ -227,7 +226,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event,
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
external_ip="127.0.0.1") external_ip="127.0.0.1")
stream_info_manager = TempEncryptedFileMetadataManager() stream_info_manager = DBEncryptedFileMetadataManager(db_dir)
lbry_file_manager = EncryptedFileManager(session, stream_info_manager, sd_identifier) lbry_file_manager = EncryptedFileManager(session, stream_info_manager, sd_identifier)
@ -520,7 +519,7 @@ class TestTransfer(TestCase):
blob_tracker_class=DummyBlobAvailabilityTracker, blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1") dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1")
self.stream_info_manager = TempEncryptedFileMetadataManager() self.stream_info_manager = DBEncryptedFileMetadataManager(db_dir)
self.lbry_file_manager = EncryptedFileManager( self.lbry_file_manager = EncryptedFileManager(
self.session, self.stream_info_manager, sd_identifier) self.session, self.stream_info_manager, sd_identifier)
@ -820,7 +819,7 @@ class TestTransfer(TestCase):
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
external_ip="127.0.0.1") external_ip="127.0.0.1")
self.stream_info_manager = TempEncryptedFileMetadataManager() self.stream_info_manager = DBEncryptedFileMetadataManager(db_dir)
self.lbry_file_manager = EncryptedFileManager( self.lbry_file_manager = EncryptedFileManager(
self.session, self.stream_info_manager, sd_identifier) self.session, self.stream_info_manager, sd_identifier)

View file

@ -7,7 +7,6 @@ from twisted.trial.unittest import TestCase
from twisted.internet import defer, threads from twisted.internet import defer, threads
from lbrynet import conf from lbrynet import conf
from lbrynet.lbry_file.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager
from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager
from lbrynet.core.Session import Session from lbrynet.core.Session import Session
@ -80,7 +79,7 @@ class TestStreamify(TestCase):
is_generous=self.is_generous, external_ip="127.0.0.1" is_generous=self.is_generous, external_ip="127.0.0.1"
) )
self.stream_info_manager = TempEncryptedFileMetadataManager() self.stream_info_manager = DBEncryptedFileMetadataManager(db_dir)
self.lbry_file_manager = EncryptedFileManager( self.lbry_file_manager = EncryptedFileManager(
self.session, self.stream_info_manager, sd_identifier) self.session, self.stream_info_manager, sd_identifier)

View file

@ -7,6 +7,7 @@ from lbrynet.cryptstream.CryptBlob import CryptBlobInfo
from lbrynet.core.Error import NoSuchStreamHash from lbrynet.core.Error import NoSuchStreamHash
from lbrynet.tests.util import random_lbry_hash from lbrynet.tests.util import random_lbry_hash
class DBEncryptedFileMetadataManagerTest(unittest.TestCase): class DBEncryptedFileMetadataManagerTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.db_dir = tempfile.mkdtemp() self.db_dir = tempfile.mkdtemp()

View file

@ -3,8 +3,10 @@ from twisted.trial import unittest
from lbrynet import conf from lbrynet import conf
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager
from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
from lbrynet.tests.util import random_lbry_hash from lbrynet.tests.util import random_lbry_hash
class TestEncryptedFileManager(unittest.TestCase): class TestEncryptedFileManager(unittest.TestCase):
def setUp(self): def setUp(self):
@ -19,12 +21,12 @@ class TestEncryptedFileManager(unittest.TestCase):
session = MocSession() session = MocSession()
session.db_dir = '.' session.db_dir = '.'
stream_info_manager = None stream_info_manager = DBEncryptedFileMetadataManager('.')
sd_identifier = None sd_identifier = None
download_directory = '.' download_directory = '.'
manager = EncryptedFileManager( manager = EncryptedFileManager(
session, stream_info_manager, sd_identifier, download_directory) session, stream_info_manager, sd_identifier, download_directory)
yield manager._open_db() yield manager.stream_info_manager.setup()
out = yield manager._get_all_lbry_files() out = yield manager._get_all_lbry_files()
self.assertEqual(len(out), 0) self.assertEqual(len(out), 0)