From 4b407a9dd2656c8b3df59c2bd0c4f6c866a909fb Mon Sep 17 00:00:00 2001 From: Jimmy Kiselak Date: Tue, 15 Sep 2015 00:29:18 -0400 Subject: [PATCH] shut down session last, actually save LBRY file metadata --- lbrynet/core/BlobManager.py | 3 +++ lbrynet/core/client/ConnectionManager.py | 8 ++++++-- lbrynet/lbryfilemanager/LBRYFileManager.py | 23 ++++++++++++++++------ lbrynet/lbrynet_console/LBRYConsole.py | 6 ++---- 4 files changed, 28 insertions(+), 12 deletions(-) diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index a30585bb3..f7bd7a4bc 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -82,11 +82,14 @@ class DiskBlobManager(BlobManager): self._next_manage_call = None def setup(self): + log.info("Setting up the DiskBlobManager. blob_dir: %s, db_file: %s", str(self.blob_dir), + str(self.db_file)) d = self._open_db() d.addCallback(lambda _: self._manage()) return d def stop(self): + log.info("Stopping the DiskBlobManager") if self._next_manage_call is not None and self._next_manage_call.active(): self._next_manage_call.cancel() self._next_manage_call = None diff --git a/lbrynet/core/client/ConnectionManager.py b/lbrynet/core/client/ConnectionManager.py index cdf6fe697..990f9a551 100644 --- a/lbrynet/core/client/ConnectionManager.py +++ b/lbrynet/core/client/ConnectionManager.py @@ -28,16 +28,20 @@ class ConnectionManager(object): self._peer_connections = {} # {Peer: PeerConnectionHandler} self._connections_closing = {} # {Peer: deferred (fired when the connection is closed)} self._next_manage_call = None + self.stopped = True def start(self): from twisted.internet import reactor + self.stopped = False + if self._next_manage_call is not None and self._next_manage_call.active() is True: self._next_manage_call.cancel() self._next_manage_call = reactor.callLater(0, self._manage) return defer.succeed(True) def stop(self): + self.stopped = True if self._next_manage_call is not None and self._next_manage_call.active() is True: self._next_manage_call.cancel() self._next_manage_call = None @@ -71,7 +75,7 @@ class ConnectionManager(object): log.debug("Trying to get the next request for peer %s", str(peer)) - if not peer in self._peer_connections: + if not peer in self._peer_connections or self.stopped is True: log.debug("The peer has already been told to shut down.") return defer.succeed(False) @@ -142,7 +146,7 @@ class ConnectionManager(object): from twisted.internet import reactor - if peer is not None: + if peer is not None and self.stopped is False: log.debug("Trying to connect to %s", str(peer)) factory = ClientProtocolFactory(peer, self.rate_limiter, self) self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:], diff --git a/lbrynet/lbryfilemanager/LBRYFileManager.py b/lbrynet/lbryfilemanager/LBRYFileManager.py index c86799db6..f27f6052e 100644 --- a/lbrynet/lbryfilemanager/LBRYFileManager.py +++ b/lbrynet/lbryfilemanager/LBRYFileManager.py @@ -59,6 +59,9 @@ class LBRYFileManager(object): d.addCallback(get_options) return d + def save_lbry_file(self, stream_hash, data_payment_rate): + return self._save_lbry_file(stream_hash, data_payment_rate) + def get_lbry_file_status(self, stream_hash): return self._get_lbry_file_status(stream_hash) @@ -97,7 +100,7 @@ class LBRYFileManager(object): def set_options_and_restore(stream_hash, options): payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager) - d = self.add_lbry_file(stream_hash, payment_rate_manager, blob_data_rate=options[0]) + d = self.start_lbry_file(stream_hash, payment_rate_manager, blob_data_rate=options[0]) d.addCallback(lambda downloader: downloader.restore()) return d @@ -114,7 +117,7 @@ class LBRYFileManager(object): d.addCallback(start_lbry_files) return d - def add_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True): + def start_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True): payment_rate_manager.min_blob_data_payment_rate = blob_data_rate lbry_file_downloader = ManagedLBRYFileDownloader(stream_hash, self.session.peer_finder, self.session.rate_limiter, self.session.blob_manager, @@ -123,11 +126,15 @@ class LBRYFileManager(object): self.download_directory, upload_allowed) self.lbry_files.append(lbry_file_downloader) - d = self.set_lbry_file_data_payment_rate(stream_hash, blob_data_rate) - d.addCallback(lambda _: lbry_file_downloader.set_stream_info()) + d = lbry_file_downloader.set_stream_info() d.addCallback(lambda _: lbry_file_downloader) return d + def add_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True): + d = self._save_lbry_file(stream_hash, blob_data_rate) + d.addCallback(lambda _: self.start_lbry_file(stream_hash, payment_rate_manager, blob_data_rate, upload_allowed)) + return d + def delete_lbry_file(self, stream_hash): for l in self.lbry_files: if l.stream_hash == stream_hash: @@ -206,8 +213,6 @@ class LBRYFileManager(object): # threads. self.sql_db = adbapi.ConnectionPool("sqlite3", os.path.join(self.session.db_dir, "lbryfile_info.db"), check_same_thread=False) - #self.unql_db = unqlite.UnQLite(os.path.join(self.session.db_dir, "lbryfile_manager.db")) - return self.sql_db.runQuery("create table if not exists lbry_file_options (" + " blob_data_rate real, " + " status text," + @@ -215,6 +220,12 @@ class LBRYFileManager(object): " foreign key(stream_hash) references lbry_files(stream_hash)" + ")") + @rerun_if_locked + def _save_lbry_file(self, stream_hash, data_payment_rate): + return self.sql_db.runQuery("insert into lbry_file_options values (?, ?, ?)", + (data_payment_rate, ManagedLBRYFileDownloader.STATUS_STOPPED, + stream_hash)) + @rerun_if_locked def _get_lbry_file_options(self, stream_hash): d = self.sql_db.runQuery("select blob_data_rate from lbry_file_options where stream_hash = ?", diff --git a/lbrynet/lbrynet_console/LBRYConsole.py b/lbrynet/lbrynet_console/LBRYConsole.py index 54e442d16..a0b28a98c 100644 --- a/lbrynet/lbrynet_console/LBRYConsole.py +++ b/lbrynet/lbrynet_console/LBRYConsole.py @@ -103,11 +103,9 @@ class LBRYConsole(): def shut_down(self): """Stop the session, all currently running streams, and stop the server""" + d = self._shut_down() if self.session is not None: - d = self.session.shut_down() - else: - d = defer.succeed(True) - d.addCallback(lambda _: self._shut_down()) + d.addCallback(lambda _: self.session.shut_down()) return d def add_control_handlers(self, control_handlers):