shut down session last, actually save LBRY file metadata

This commit is contained in:
Jimmy Kiselak 2015-09-15 00:29:18 -04:00
parent 9b904a7b4a
commit 4b407a9dd2
4 changed files with 28 additions and 12 deletions

View file

@ -82,11 +82,14 @@ class DiskBlobManager(BlobManager):
self._next_manage_call = None self._next_manage_call = None
def setup(self): def setup(self):
log.info("Setting up the DiskBlobManager. blob_dir: %s, db_file: %s", str(self.blob_dir),
str(self.db_file))
d = self._open_db() d = self._open_db()
d.addCallback(lambda _: self._manage()) d.addCallback(lambda _: self._manage())
return d return d
def stop(self): def stop(self):
log.info("Stopping the DiskBlobManager")
if self._next_manage_call is not None and self._next_manage_call.active(): if self._next_manage_call is not None and self._next_manage_call.active():
self._next_manage_call.cancel() self._next_manage_call.cancel()
self._next_manage_call = None self._next_manage_call = None

View file

@ -28,16 +28,20 @@ class ConnectionManager(object):
self._peer_connections = {} # {Peer: PeerConnectionHandler} self._peer_connections = {} # {Peer: PeerConnectionHandler}
self._connections_closing = {} # {Peer: deferred (fired when the connection is closed)} self._connections_closing = {} # {Peer: deferred (fired when the connection is closed)}
self._next_manage_call = None self._next_manage_call = None
self.stopped = True
def start(self): def start(self):
from twisted.internet import reactor from twisted.internet import reactor
self.stopped = False
if self._next_manage_call is not None and self._next_manage_call.active() is True: if self._next_manage_call is not None and self._next_manage_call.active() is True:
self._next_manage_call.cancel() self._next_manage_call.cancel()
self._next_manage_call = reactor.callLater(0, self._manage) self._next_manage_call = reactor.callLater(0, self._manage)
return defer.succeed(True) return defer.succeed(True)
def stop(self): def stop(self):
self.stopped = True
if self._next_manage_call is not None and self._next_manage_call.active() is True: if self._next_manage_call is not None and self._next_manage_call.active() is True:
self._next_manage_call.cancel() self._next_manage_call.cancel()
self._next_manage_call = None self._next_manage_call = None
@ -71,7 +75,7 @@ class ConnectionManager(object):
log.debug("Trying to get the next request for peer %s", str(peer)) log.debug("Trying to get the next request for peer %s", str(peer))
if not peer in self._peer_connections: if not peer in self._peer_connections or self.stopped is True:
log.debug("The peer has already been told to shut down.") log.debug("The peer has already been told to shut down.")
return defer.succeed(False) return defer.succeed(False)
@ -142,7 +146,7 @@ class ConnectionManager(object):
from twisted.internet import reactor from twisted.internet import reactor
if peer is not None: if peer is not None and self.stopped is False:
log.debug("Trying to connect to %s", str(peer)) log.debug("Trying to connect to %s", str(peer))
factory = ClientProtocolFactory(peer, self.rate_limiter, self) factory = ClientProtocolFactory(peer, self.rate_limiter, self)
self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:], self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:],

View file

@ -59,6 +59,9 @@ class LBRYFileManager(object):
d.addCallback(get_options) d.addCallback(get_options)
return d return d
def save_lbry_file(self, stream_hash, data_payment_rate):
return self._save_lbry_file(stream_hash, data_payment_rate)
def get_lbry_file_status(self, stream_hash): def get_lbry_file_status(self, stream_hash):
return self._get_lbry_file_status(stream_hash) return self._get_lbry_file_status(stream_hash)
@ -97,7 +100,7 @@ class LBRYFileManager(object):
def set_options_and_restore(stream_hash, options): def set_options_and_restore(stream_hash, options):
payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager) payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager)
d = self.add_lbry_file(stream_hash, payment_rate_manager, blob_data_rate=options[0]) d = self.start_lbry_file(stream_hash, payment_rate_manager, blob_data_rate=options[0])
d.addCallback(lambda downloader: downloader.restore()) d.addCallback(lambda downloader: downloader.restore())
return d return d
@ -114,7 +117,7 @@ class LBRYFileManager(object):
d.addCallback(start_lbry_files) d.addCallback(start_lbry_files)
return d return d
def add_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True): def start_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True):
payment_rate_manager.min_blob_data_payment_rate = blob_data_rate payment_rate_manager.min_blob_data_payment_rate = blob_data_rate
lbry_file_downloader = ManagedLBRYFileDownloader(stream_hash, self.session.peer_finder, lbry_file_downloader = ManagedLBRYFileDownloader(stream_hash, self.session.peer_finder,
self.session.rate_limiter, self.session.blob_manager, self.session.rate_limiter, self.session.blob_manager,
@ -123,11 +126,15 @@ class LBRYFileManager(object):
self.download_directory, self.download_directory,
upload_allowed) upload_allowed)
self.lbry_files.append(lbry_file_downloader) self.lbry_files.append(lbry_file_downloader)
d = self.set_lbry_file_data_payment_rate(stream_hash, blob_data_rate) d = lbry_file_downloader.set_stream_info()
d.addCallback(lambda _: lbry_file_downloader.set_stream_info())
d.addCallback(lambda _: lbry_file_downloader) d.addCallback(lambda _: lbry_file_downloader)
return d return d
def add_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True):
d = self._save_lbry_file(stream_hash, blob_data_rate)
d.addCallback(lambda _: self.start_lbry_file(stream_hash, payment_rate_manager, blob_data_rate, upload_allowed))
return d
def delete_lbry_file(self, stream_hash): def delete_lbry_file(self, stream_hash):
for l in self.lbry_files: for l in self.lbry_files:
if l.stream_hash == stream_hash: if l.stream_hash == stream_hash:
@ -206,8 +213,6 @@ class LBRYFileManager(object):
# threads. # threads.
self.sql_db = adbapi.ConnectionPool("sqlite3", os.path.join(self.session.db_dir, "lbryfile_info.db"), self.sql_db = adbapi.ConnectionPool("sqlite3", os.path.join(self.session.db_dir, "lbryfile_info.db"),
check_same_thread=False) check_same_thread=False)
#self.unql_db = unqlite.UnQLite(os.path.join(self.session.db_dir, "lbryfile_manager.db"))
return self.sql_db.runQuery("create table if not exists lbry_file_options (" + return self.sql_db.runQuery("create table if not exists lbry_file_options (" +
" blob_data_rate real, " + " blob_data_rate real, " +
" status text," + " status text," +
@ -215,6 +220,12 @@ class LBRYFileManager(object):
" foreign key(stream_hash) references lbry_files(stream_hash)" + " foreign key(stream_hash) references lbry_files(stream_hash)" +
")") ")")
@rerun_if_locked
def _save_lbry_file(self, stream_hash, data_payment_rate):
return self.sql_db.runQuery("insert into lbry_file_options values (?, ?, ?)",
(data_payment_rate, ManagedLBRYFileDownloader.STATUS_STOPPED,
stream_hash))
@rerun_if_locked @rerun_if_locked
def _get_lbry_file_options(self, stream_hash): def _get_lbry_file_options(self, stream_hash):
d = self.sql_db.runQuery("select blob_data_rate from lbry_file_options where stream_hash = ?", d = self.sql_db.runQuery("select blob_data_rate from lbry_file_options where stream_hash = ?",

View file

@ -103,11 +103,9 @@ class LBRYConsole():
def shut_down(self): def shut_down(self):
"""Stop the session, all currently running streams, and stop the server""" """Stop the session, all currently running streams, and stop the server"""
d = self._shut_down()
if self.session is not None: if self.session is not None:
d = self.session.shut_down() d.addCallback(lambda _: self.session.shut_down())
else:
d = defer.succeed(True)
d.addCallback(lambda _: self._shut_down())
return d return d
def add_control_handlers(self, control_handlers): def add_control_handlers(self, control_handlers):