From fb4e94b04ab1ab705d802091cb9438e4c9eb704d Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Sat, 15 Dec 2018 15:29:25 -0500 Subject: [PATCH] asyncifying minimal amount of the old blob/p2p code --- lbrynet/blob/EncryptedFileCreator.py | 9 +++-- lbrynet/blob/EncryptedFileDownloader.py | 13 +++--- lbrynet/blob/EncryptedFileManager.py | 40 +++++++++---------- lbrynet/blob/blob_file.py | 14 +++---- .../blob/client/EncryptedFileDownloader.py | 24 +++++------ lbrynet/p2p/BlobManager.py | 13 +++--- lbrynet/p2p/StreamDescriptor.py | 16 +++----- 7 files changed, 60 insertions(+), 69 deletions(-) diff --git a/lbrynet/blob/EncryptedFileCreator.py b/lbrynet/blob/EncryptedFileCreator.py index d8ee1868d..d101e3575 100644 --- a/lbrynet/blob/EncryptedFileCreator.py +++ b/lbrynet/blob/EncryptedFileCreator.py @@ -9,6 +9,7 @@ from binascii import hexlify from twisted.internet import defer from twisted.protocols.basic import FileSender +from lbrynet.extras.compat import f2d from lbrynet.p2p.StreamDescriptor import BlobStreamDescriptorWriter, EncryptedFileStreamType from lbrynet.p2p.StreamDescriptor import format_sd_info, get_stream_hash, validate_descriptor from lbrynet.blob.CryptStreamCreator import CryptStreamCreator @@ -119,13 +120,13 @@ def create_lbry_file(blob_manager, storage, payment_rate_manager, lbry_file_mana sd_hash = yield descriptor_writer.create_descriptor(sd_info) log.debug("saving the stream") - yield storage.store_stream( + yield f2d(storage.store_stream( sd_info['stream_hash'], sd_hash, sd_info['stream_name'], sd_info['key'], sd_info['suggested_file_name'], sd_info['blobs'] - ) + )) log.debug("adding to the file manager") - lbry_file = yield lbry_file_manager.add_published_file( + lbry_file = yield f2d(lbry_file_manager.add_published_file( sd_info['stream_hash'], sd_hash, hexlify(file_directory.encode()), payment_rate_manager, payment_rate_manager.min_blob_data_payment_rate - ) + )) defer.returnValue(lbry_file) diff --git a/lbrynet/blob/EncryptedFileDownloader.py b/lbrynet/blob/EncryptedFileDownloader.py index 108bb2f80..1f30bd95a 100644 --- a/lbrynet/blob/EncryptedFileDownloader.py +++ b/lbrynet/blob/EncryptedFileDownloader.py @@ -6,6 +6,7 @@ from binascii import hexlify, unhexlify from twisted.internet import defer from lbrynet import conf +from lbrynet.extras.compat import f2d from lbrynet.p2p.client.StreamProgressManager import FullStreamProgressManager from lbrynet.p2p.HTTPBlobDownloader import HTTPBlobDownloader from lbrynet.utils import short_hash @@ -70,13 +71,11 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): self.channel_name = claim_info['channel_name'] self.metadata = claim_info['value']['stream']['metadata'] - @defer.inlineCallbacks - def get_claim_info(self, include_supports=True): - claim_info = yield self.storage.get_content_claim(self.stream_hash, include_supports) + async def get_claim_info(self, include_supports=True): + claim_info = await self.storage.get_content_claim(self.stream_hash, include_supports) if claim_info: self.set_claim_info(claim_info) - - defer.returnValue(claim_info) + return claim_info @property def saving_status(self): @@ -180,10 +179,10 @@ class ManagedEncryptedFileDownloaderFactory: if file_name: file_name = hexlify(file_name.encode()) hex_download_directory = hexlify(download_directory.encode()) - lbry_file = yield self.lbry_file_manager.add_downloaded_file( + lbry_file = yield f2d(self.lbry_file_manager.add_downloaded_file( stream_hash, metadata.source_blob_hash, hex_download_directory, payment_rate_manager, data_rate, file_name=file_name, download_mirrors=download_mirrors - ) + )) defer.returnValue(lbry_file) @staticmethod diff --git a/lbrynet/blob/EncryptedFileManager.py b/lbrynet/blob/EncryptedFileManager.py index 902ecae88..3a25846df 100644 --- a/lbrynet/blob/EncryptedFileManager.py +++ b/lbrynet/blob/EncryptedFileManager.py @@ -8,6 +8,7 @@ from binascii import hexlify, unhexlify from twisted.internet import defer, task, reactor from twisted.python.failure import Failure from lbrynet import conf +from lbrynet.extras.compat import f2d from lbrynet.extras.reflector.reupload import reflect_file from lbrynet.blob.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.blob.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory @@ -41,11 +42,9 @@ class EncryptedFileManager: self.lbry_files = [] self.lbry_file_reflector = task.LoopingCall(self.reflect_lbry_files) - @defer.inlineCallbacks def setup(self): - yield self._add_to_sd_identifier() - yield self._start_lbry_files() - log.info("Started file manager") + self._add_to_sd_identifier() + return self._start_lbry_files() def get_lbry_file_status(self, lbry_file): return self.storage.get_lbry_file_status(lbry_file.rowid) @@ -115,10 +114,9 @@ class EncryptedFileManager: except Exception: log.warning("Failed to start %i", file_info.get('rowid')) - @defer.inlineCallbacks - def _start_lbry_files(self): - files = yield self.storage.get_all_lbry_files() - claim_infos = yield self.storage.get_claims_from_stream_hashes([file['stream_hash'] for file in files]) + async def _start_lbry_files(self): + files = await self.storage.get_all_lbry_files() + claim_infos = await self.storage.get_claims_from_stream_hashes([file['stream_hash'] for file in files]) prm = self.payment_rate_manager log.info("Starting %i files", len(files)) @@ -151,14 +149,13 @@ class EncryptedFileManager: log.info("Stopping %i lbry files", len(self.lbry_files)) yield defer.DeferredList([self._stop_lbry_file(lbry_file) for lbry_file in list(self.lbry_files)]) - @defer.inlineCallbacks - def add_published_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager, blob_data_rate): + async def add_published_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager, blob_data_rate): status = ManagedEncryptedFileDownloader.STATUS_FINISHED - stream_metadata = yield get_sd_info(self.storage, stream_hash, include_blobs=False) + stream_metadata = await get_sd_info(self.storage, stream_hash, include_blobs=False) key = stream_metadata['key'] stream_name = stream_metadata['stream_name'] file_name = stream_metadata['suggested_file_name'] - rowid = yield self.storage.save_published_file( + rowid = await self.storage.save_published_file( stream_hash, file_name, download_directory, blob_data_rate, status ) lbry_file = self._get_lbry_file( @@ -166,37 +163,36 @@ class EncryptedFileManager: stream_metadata['suggested_file_name'], download_mirrors=None ) lbry_file.restore(status) - yield lbry_file.get_claim_info() + await lbry_file.get_claim_info() self.storage.content_claim_callbacks[stream_hash] = lbry_file.get_claim_info self.lbry_files.append(lbry_file) - defer.returnValue(lbry_file) + return lbry_file - @defer.inlineCallbacks - def add_downloaded_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager=None, + async def add_downloaded_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager=None, blob_data_rate=None, status=None, file_name=None, download_mirrors=None): status = status or ManagedEncryptedFileDownloader.STATUS_STOPPED payment_rate_manager = payment_rate_manager or self.payment_rate_manager blob_data_rate = blob_data_rate or payment_rate_manager.min_blob_data_payment_rate - stream_metadata = yield get_sd_info(self.storage, stream_hash, include_blobs=False) + stream_metadata = await get_sd_info(self.storage, stream_hash, include_blobs=False) key = stream_metadata['key'] stream_name = stream_metadata['stream_name'] file_name = file_name or stream_metadata['suggested_file_name'] # when we save the file we'll atomic touch the nearest file to the suggested file name # that doesn't yet exist in the download directory - rowid = yield self.storage.save_downloaded_file( + rowid = await self.storage.save_downloaded_file( stream_hash, hexlify(os.path.basename(unhexlify(file_name))), download_directory, blob_data_rate ) - file_name = (yield self.storage.get_filename_for_rowid(rowid)).decode() + file_name = (await self.storage.get_filename_for_rowid(rowid)).decode() lbry_file = self._get_lbry_file( rowid, stream_hash, payment_rate_manager, sd_hash, key, stream_name, file_name, download_directory, stream_metadata['suggested_file_name'], download_mirrors ) lbry_file.restore(status) - yield lbry_file.get_claim_info(include_supports=False) + await lbry_file.get_claim_info(include_supports=False) self.storage.content_claim_callbacks[stream_hash] = lbry_file.get_claim_info self.lbry_files.append(lbry_file) - defer.returnValue(lbry_file) + return lbry_file @defer.inlineCallbacks def delete_lbry_file(self, lbry_file, delete_file=False): @@ -222,7 +218,7 @@ class EncryptedFileManager: del self.storage.content_claim_callbacks[lbry_file.stream_hash] yield lbry_file.delete_data() - yield self.storage.delete_stream(lbry_file.stream_hash) + yield f2d(self.storage.delete_stream(lbry_file.stream_hash)) if delete_file and os.path.isfile(full_path): os.remove(full_path) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index 728962971..7c50a5bce 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -1,6 +1,6 @@ import os import logging -from twisted.internet import defer, threads +from twisted.internet import defer from twisted.web.client import FileBodyProducer from twisted.python.failure import Failure from lbrynet.cryptoutils import get_lbry_hash_obj @@ -112,19 +112,19 @@ class BlobFile: self._verified = False self.saved_verified_blob = False - def delete_from_file_system(): - if os.path.isfile(self.file_path): - os.remove(self.file_path) + #def delete_from_file_system(): + if os.path.isfile(self.file_path): + os.remove(self.file_path) - d = threads.deferToThread(delete_from_file_system) + #d = threads.deferToThread(delete_from_file_system) def log_error(err): log.warning("An error occurred deleting %s: %s", str(self.file_path), err.getErrorMessage()) return err - d.addErrback(log_error) - return d + #d.addErrback(log_error) + return #d else: return defer.fail(Failure( ValueError("File is currently being read or written and cannot be deleted"))) diff --git a/lbrynet/blob/client/EncryptedFileDownloader.py b/lbrynet/blob/client/EncryptedFileDownloader.py index 059382868..7b270a91c 100644 --- a/lbrynet/blob/client/EncryptedFileDownloader.py +++ b/lbrynet/blob/client/EncryptedFileDownloader.py @@ -4,6 +4,7 @@ import traceback from binascii import hexlify, unhexlify from twisted.internet import defer, threads +from lbrynet.extras.compat import f2d from lbrynet.p2p.StreamDescriptor import save_sd_info from lbrynet.blob.client.CryptStreamDownloader import CryptStreamDownloader from lbrynet.p2p.client.StreamProgressManager import FullStreamProgressManager @@ -28,16 +29,15 @@ class EncryptedFileDownloader(CryptStreamDownloader): @defer.inlineCallbacks def delete_data(self): - crypt_infos = yield self.storage.get_blobs_for_stream(self.stream_hash) + crypt_infos = yield f2d(self.storage.get_blobs_for_stream(self.stream_hash)) blob_hashes = [b.blob_hash for b in crypt_infos if b.blob_hash] - sd_hash = yield self.storage.get_sd_blob_hash_for_stream(self.stream_hash) + sd_hash = yield f2d(self.storage.get_sd_blob_hash_for_stream(self.stream_hash)) blob_hashes.append(sd_hash) yield self.blob_manager.delete_blobs(blob_hashes) def stop(self, err=None): - d = self._close_output() - d.addCallback(lambda _: CryptStreamDownloader.stop(self, err=err)) - return d + self._close_output() + return CryptStreamDownloader.stop(self, err=err) def _get_progress_manager(self, download_manager): return FullStreamProgressManager(self._finished_downloading, @@ -174,14 +174,14 @@ class EncryptedFileSaver(EncryptedFileDownloader): def _close_output(self): self.file_handle, file_handle = None, self.file_handle - def close_file(): - if file_handle is not None: - name = file_handle.name - file_handle.close() - if self.completed is False: - os.remove(name) + #def close_file(): + if file_handle is not None: + name = file_handle.name + file_handle.close() + if self.completed is False: + os.remove(name) - return threads.deferToThread(close_file) + #return threads.deferToThread(close_file) def _get_write_func(self): def write_func(data): diff --git a/lbrynet/p2p/BlobManager.py b/lbrynet/p2p/BlobManager.py index 0c604d017..592e9fc8d 100644 --- a/lbrynet/p2p/BlobManager.py +++ b/lbrynet/p2p/BlobManager.py @@ -3,6 +3,7 @@ import os from binascii import unhexlify from sqlite3 import IntegrityError from twisted.internet import threads, defer +from lbrynet.extras.compat import f2d from lbrynet.blob.blob_file import BlobFile from lbrynet.blob.creator import BlobFileCreator @@ -26,15 +27,13 @@ class DiskBlobManager: self.blobs = {} self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)} - @defer.inlineCallbacks - def setup(self): + async def setup(self): if self._node_datastore is not None: - raw_blob_hashes = yield self.storage.get_all_finished_blobs() + raw_blob_hashes = await self.storage.get_all_finished_blobs() self._node_datastore.completed_blobs.update(raw_blob_hashes) - defer.returnValue(True) - def stop(self): - return defer.succeed(True) + async def stop(self): + pass def get_blob(self, blob_hash, length=None): """Return a blob identified by blob_hash, which may be a new blob or a @@ -112,7 +111,7 @@ class DiskBlobManager: except Exception as e: log.warning("Failed to delete blob file. Reason: %s", e) try: - yield self.storage.delete_blobs_from_db(bh_to_delete_from_db) + yield f2d(self.storage.delete_blobs_from_db(bh_to_delete_from_db)) except IntegrityError as err: if str(err) != "FOREIGN KEY constraint failed": raise err diff --git a/lbrynet/p2p/StreamDescriptor.py b/lbrynet/p2p/StreamDescriptor.py index 3b5fddcf1..e140a36c1 100644 --- a/lbrynet/p2p/StreamDescriptor.py +++ b/lbrynet/p2p/StreamDescriptor.py @@ -299,8 +299,7 @@ def format_sd_info(stream_type, stream_name, key, suggested_file_name, stream_ha } -@defer.inlineCallbacks -def get_sd_info(storage, stream_hash, include_blobs): +async def get_sd_info(storage, stream_hash, include_blobs): """ Get an sd info dictionary from storage @@ -329,16 +328,13 @@ def get_sd_info(storage, stream_hash, include_blobs): ] } """ - - stream_info = yield storage.get_stream_info(stream_hash) + stream_info = await storage.get_stream_info(stream_hash) blobs = [] if include_blobs: - blobs = yield storage.get_blobs_for_stream(stream_hash) - defer.returnValue( - format_sd_info( - EncryptedFileStreamType, stream_info[0], stream_info[1], - stream_info[2], stream_hash, format_blobs(blobs) - ) + blobs = await storage.get_blobs_for_stream(stream_hash) + return format_sd_info( + EncryptedFileStreamType, stream_info[0], stream_info[1], + stream_info[2], stream_hash, format_blobs(blobs) )